audio_pipeline

This commit is contained in:
sanek5g
2026-06-10 17:12:58 +03:00
commit 00ddac5af7
29 changed files with 3297 additions and 0 deletions

11
watcher/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM golang:1.22-alpine AS build
WORKDIR /src
COPY go.mod go.sum* ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 go build -o /watcher ./cmd/watcher
FROM alpine:3.19
RUN apk add --no-cache ca-certificates
COPY --from=build /watcher /watcher
ENTRYPOINT ["/watcher"]

115
watcher/cmd/watcher/main.go Normal file
View File

@@ -0,0 +1,115 @@
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/yourorg/watcher/internal/config"
"github.com/yourorg/watcher/internal/publisher"
"github.com/yourorg/watcher/internal/scanner"
)
func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})))
cfg := config.Load()
sc := scanner.New(scanner.Config{
StorageRoot: cfg.StorageRoot,
IncomingDir: cfg.IncomingDir,
ProcessingDir: cfg.ProcessingDir,
FailedDir: cfg.FailedDir,
StableWindow: cfg.StableWindow,
StableChecks: cfg.StableChecks,
})
if err := sc.EnsureDirs(); err != nil {
slog.Error("ensure dirs failed", "error", err)
os.Exit(1)
}
ch := mustRabbit(cfg.RabbitURL)
if err := ch.ExchangeDeclare(cfg.Exchange, "direct", true, false, false, false, nil); err != nil {
slog.Error("declare exchange failed", "error", err)
os.Exit(1)
}
pub, err := publisher.New(ch, cfg.Exchange, cfg.RoutingKey)
if err != nil {
slog.Error("publisher init failed", "error", err)
os.Exit(1)
}
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
slog.Info("watcher started",
"storage_root", cfg.StorageRoot,
"poll_interval", cfg.PollInterval.String(),
"exchange", cfg.Exchange,
"routing_key", cfg.RoutingKey,
)
ticker := time.NewTicker(cfg.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("watcher stopping")
return
case <-ticker.C:
claimed, err := sc.ScanOnce()
if err != nil {
slog.Warn("scan failed", "error", err)
continue
}
for _, cf := range claimed {
task := publisher.AudioTask{
TaskID: cf.TaskID,
FilePath: cf.FilePath,
Filename: cf.Filename,
Size: cf.Size,
}
pubCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
err := pub.Publish(pubCtx, task)
cancel()
if err != nil {
slog.Warn("publish failed, rolling back", "task_id", cf.TaskID, "error", err)
if rbErr := sc.RollbackToIncoming(cf.FilePath, cf.Filename); rbErr != nil {
slog.Error("rollback failed, moving to failed", "task_id", cf.TaskID, "error", rbErr)
_ = sc.MoveToFailed(cf.FilePath, cf.Filename)
}
continue
}
slog.Info("task published", "task_id", cf.TaskID, "filename", cf.Filename)
}
}
}
}
func mustRabbit(url string) *amqp.Channel {
var conn *amqp.Connection
var err error
for i := 0; i < 30; i++ {
conn, err = amqp.Dial(url)
if err == nil {
break
}
slog.Info("waiting for rabbit", "attempt", i+1, "error", err)
time.Sleep(2 * time.Second)
}
if err != nil {
slog.Error("rabbit unreachable", "error", err)
os.Exit(1)
}
ch, err := conn.Channel()
if err != nil {
slog.Error("rabbit channel failed", "error", err)
os.Exit(1)
}
return ch
}

8
watcher/go.mod Normal file
View File

@@ -0,0 +1,8 @@
module github.com/yourorg/watcher
go 1.22
require (
github.com/oklog/ulid/v2 v2.1.0
github.com/rabbitmq/amqp091-go v1.9.0
)

21
watcher/go.sum Normal file
View File

@@ -0,0 +1,21 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,60 @@
package config
import (
"os"
"strconv"
"time"
)
type Config struct {
StorageRoot string
IncomingDir string
ProcessingDir string
FailedDir string
PollInterval time.Duration
StableWindow time.Duration
StableChecks int
RabbitURL string
Exchange string
RoutingKey string
}
func Load() Config {
return Config{
StorageRoot: getEnv("STORAGE_ROOT", "/data/storage"),
IncomingDir: getEnv("INCOMING_DIR", "incoming"),
ProcessingDir: getEnv("PROCESSING_DIR", "processing"),
FailedDir: getEnv("FAILED_DIR", "failed"),
PollInterval: getDuration("POLL_INTERVAL", 5*time.Second),
StableWindow: getDuration("STABLE_WINDOW", 2*time.Second),
StableChecks: getInt("STABLE_CHECKS", 3),
RabbitURL: getEnv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/"),
Exchange: getEnv("RABBITMQ_EXCHANGE", "audio_pipeline"),
RoutingKey: getEnv("RABBITMQ_ROUTING_KEY", "audio.new"),
}
}
func getEnv(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}
func getInt(key string, def int) int {
if v := os.Getenv(key); v != "" {
if i, err := strconv.Atoi(v); err == nil {
return i
}
}
return def
}
func getDuration(key string, def time.Duration) time.Duration {
if v := os.Getenv(key); v != "" {
if d, err := time.ParseDuration(v); err == nil {
return d
}
}
return def
}

View File

@@ -0,0 +1,58 @@
package publisher
import (
"context"
"encoding/json"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type AudioTask struct {
TaskID string `json:"task_id"`
FilePath string `json:"file_path"`
Filename string `json:"filename"`
Size int64 `json:"size"`
CreatedAt int64 `json:"created_at"`
}
type Publisher struct {
ch *amqp.Channel
exchange string
routingKey string
}
func New(ch *amqp.Channel, exchange, routingKey string) (*Publisher, error) {
if err := ch.Confirm(false); err != nil {
return nil, fmt.Errorf("confirm mode: %w", err)
}
return &Publisher{ch: ch, exchange: exchange, routingKey: routingKey}, nil
}
func (p *Publisher) Publish(ctx context.Context, task AudioTask) error {
if task.CreatedAt == 0 {
task.CreatedAt = time.Now().Unix()
}
body, err := json.Marshal(task)
if err != nil {
return err
}
confirms := p.ch.NotifyPublish(make(chan amqp.Confirmation, 1))
if err := p.ch.PublishWithContext(ctx, p.exchange, p.routingKey, false, false, amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
}); err != nil {
return err
}
select {
case confirm := <-confirms:
if !confirm.Ack {
return fmt.Errorf("publish not confirmed")
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}

View File

@@ -0,0 +1,144 @@
package scanner
import (
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
"github.com/oklog/ulid/v2"
)
var allowedExts = map[string]bool{
".mp3": true, ".wav": true, ".m4a": true,
".ogg": true, ".flac": true, ".webm": true,
}
type ClaimedFile struct {
TaskID string
FilePath string
Filename string
Size int64
}
type Config struct {
StorageRoot string
IncomingDir string
ProcessingDir string
FailedDir string
StableWindow time.Duration
StableChecks int
}
type Scanner struct {
cfg Config
}
func New(cfg Config) *Scanner {
return &Scanner{cfg: cfg}
}
func (s *Scanner) EnsureDirs() error {
for _, dir := range []string{s.cfg.IncomingDir, s.cfg.ProcessingDir, s.cfg.FailedDir} {
if err := os.MkdirAll(filepath.Join(s.cfg.StorageRoot, dir), 0o755); err != nil {
return err
}
}
return nil
}
func (s *Scanner) ScanOnce() ([]ClaimedFile, error) {
incoming := filepath.Join(s.cfg.StorageRoot, s.cfg.IncomingDir)
entries, err := os.ReadDir(incoming)
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
var claimed []ClaimedFile
for _, e := range entries {
if e.IsDir() {
continue
}
name := e.Name()
if strings.HasPrefix(name, ".") || strings.HasSuffix(strings.ToLower(name), ".tmp") {
continue
}
ext := strings.ToLower(filepath.Ext(name))
if !allowedExts[ext] {
continue
}
src := filepath.Join(incoming, name)
if !s.isStable(src) {
continue
}
cf, err := s.claim(src, name, ext)
if err != nil {
slog.Warn("claim failed", "file", name, "error", err)
continue
}
claimed = append(claimed, cf)
}
return claimed, nil
}
func (s *Scanner) isStable(path string) bool {
var lastSize int64 = -1
for i := 0; i < s.cfg.StableChecks; i++ {
info, err := os.Stat(path)
if err != nil {
return false
}
size := info.Size()
if lastSize >= 0 && size != lastSize {
return false
}
lastSize = size
if i < s.cfg.StableChecks-1 {
time.Sleep(s.cfg.StableWindow)
}
}
return true
}
func (s *Scanner) claim(src, originalName, ext string) (ClaimedFile, error) {
info, err := os.Stat(src)
if err != nil {
return ClaimedFile{}, err
}
taskID := ulid.Make().String()
processing := filepath.Join(s.cfg.StorageRoot, s.cfg.ProcessingDir)
dst := filepath.Join(processing, taskID+ext)
if err := os.Rename(src, dst); err != nil {
return ClaimedFile{}, fmt.Errorf("rename: %w", err)
}
slog.Info("claimed file", "task_id", taskID, "filename", originalName, "path", dst, "size", info.Size())
return ClaimedFile{
TaskID: taskID,
FilePath: dst,
Filename: originalName,
Size: info.Size(),
}, nil
}
func (s *Scanner) RollbackToIncoming(filePath, originalName string) error {
incoming := filepath.Join(s.cfg.StorageRoot, s.cfg.IncomingDir)
dst := filepath.Join(incoming, originalName)
if err := os.Rename(filePath, dst); err != nil {
return s.MoveToFailed(filePath, originalName)
}
return nil
}
func (s *Scanner) MoveToFailed(filePath, originalName string) error {
failed := filepath.Join(s.cfg.StorageRoot, s.cfg.FailedDir)
if err := os.MkdirAll(failed, 0o755); err != nil {
return err
}
dst := filepath.Join(failed, originalName)
return os.Rename(filePath, dst)
}