From 0a9bfd07992c89ca7d40653988b58fe3c54193a0 Mon Sep 17 00:00:00 2001 From: sanek5g Date: Wed, 24 Jun 2026 18:58:35 +0300 Subject: [PATCH] statuses update --- .gitignore | 1 + Makefile | 16 ++ README.md | 85 ++++++- db/init.sql | 2 +- k8s/README.md | 220 ++++++++++++++++++ k8s/analyse.yaml | 31 +++ k8s/build-images.sh | 28 +++ k8s/configmap.yaml | 31 +++ k8s/kustomization.yaml | 28 +++ k8s/namespace.yaml | 4 + k8s/postgres-init-configmap.yaml | 20 ++ k8s/postgres.yaml | 84 +++++++ k8s/prepare-secret.sh | 19 ++ k8s/rabbitmq.yaml | 50 ++++ k8s/secret.env.example | 18 ++ k8s/storage.yaml | 31 +++ k8s/tagging.yaml | 31 +++ k8s/transcribe.yaml | 37 +++ k8s/watcher.yaml | 31 +++ watcher/cmd/watcher/main.go | 14 ++ watcher/internal/config/config.go | 2 + watcher/internal/config/config_test.go | 47 ++++ watcher/internal/pipelinestatus/status.go | 57 +++++ .../internal/pipelinestatus/status_test.go | 65 ++++++ watcher/internal/publisher/publisher_test.go | 43 ++++ watcher/internal/scanner/scanner_test.go | 162 +++++++++++++ workers/analyse/cmd/analyse/main.go | 61 ++++- workers/analyse/cmd/analyse/main_test.go | 164 +++++++++++++ .../analyse/internal/pipelinestatus/status.go | 57 +++++ workers/tagging/cmd/tagging/main.go | 61 ++++- workers/tagging/cmd/tagging/main_test.go | 123 ++++++++++ .../tagging/internal/pipelinestatus/status.go | 57 +++++ workers/transcribe/internal/config/config.go | 2 + .../transcribe/internal/config/config_test.go | 51 ++++ .../transcribe/internal/consumer/consumer.go | 29 ++- .../transcribe/internal/models/models_test.go | 58 +++++ .../transcribe/internal/nexara/nexara_test.go | 118 ++++++++++ .../internal/pipelinestatus/status.go | 57 +++++ .../internal/prompts/prompts_test.go | 116 +++++++++ 39 files changed, 2099 insertions(+), 12 deletions(-) create mode 100644 Makefile create mode 100644 k8s/README.md create mode 100644 k8s/analyse.yaml create mode 100755 k8s/build-images.sh create mode 100644 k8s/configmap.yaml create mode 100644 k8s/kustomization.yaml create mode 100644 k8s/namespace.yaml create mode 100644 k8s/postgres-init-configmap.yaml create mode 100644 k8s/postgres.yaml create mode 100755 k8s/prepare-secret.sh create mode 100644 k8s/rabbitmq.yaml create mode 100644 k8s/secret.env.example create mode 100644 k8s/storage.yaml create mode 100644 k8s/tagging.yaml create mode 100644 k8s/transcribe.yaml create mode 100644 k8s/watcher.yaml create mode 100644 watcher/internal/config/config_test.go create mode 100644 watcher/internal/pipelinestatus/status.go create mode 100644 watcher/internal/pipelinestatus/status_test.go create mode 100644 watcher/internal/publisher/publisher_test.go create mode 100644 watcher/internal/scanner/scanner_test.go create mode 100644 workers/analyse/cmd/analyse/main_test.go create mode 100644 workers/analyse/internal/pipelinestatus/status.go create mode 100644 workers/tagging/cmd/tagging/main_test.go create mode 100644 workers/tagging/internal/pipelinestatus/status.go create mode 100644 workers/transcribe/internal/config/config_test.go create mode 100644 workers/transcribe/internal/models/models_test.go create mode 100644 workers/transcribe/internal/nexara/nexara_test.go create mode 100644 workers/transcribe/internal/pipelinestatus/status.go create mode 100644 workers/transcribe/internal/prompts/prompts_test.go diff --git a/.gitignore b/.gitignore index 4c49bd7..bd4664a 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .env +k8s/secret.env diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1688a5a --- /dev/null +++ b/Makefile @@ -0,0 +1,16 @@ +.PHONY: test test-watcher test-transcribe test-tagging test-analyse + +test: test-watcher test-transcribe test-tagging test-analyse + @echo "all tests passed" + +test-watcher: + cd watcher && go test ./... -count=1 + +test-transcribe: + cd workers/transcribe && go test ./... -count=1 + +test-tagging: + cd workers/tagging && go test ./... -count=1 + +test-analyse: + cd workers/analyse && go test ./... -count=1 diff --git a/README.md b/README.md index f6bcfe6..b1301c1 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,8 @@ exchange: transcription_done (fanout) └── queue: tagging queue: final (default exchange, без binding) + +queue: pipeline.status (default exchange — события стадий для фронта) ``` ### Очереди @@ -235,6 +237,55 @@ queue: final (default exchange, без binding) | `analyse` | transcribe | analyse | Результат транскрипции | | `tagging` | transcribe | tagging | Результат транскрипции | | `final` | analyse/tagging | — | Итоговый результат | +| `pipeline.status` | все воркеры | backend | Стадии обработки (см. ниже) | + +### Очередь `pipeline.status` (стадии) + +Каждый воркер публикует JSON-события в очередь `pipeline.status` (переменная `STATUS_QUEUE`). + +**Формат сообщения:** + +```json +{ + "task_id": "01KTN...", + "filename": "recording.wav", + "status": "pending", + "stage": "queued", + "error": "", + "timestamp": 1717843200 +} +``` + +**Значения `status`:** + +| status | Описание | +|----------------|-----------------| +| `pending` | ожидает | +| `in_progress` | в процессе | +| `done` | готово | +| `error` | ошибка | + +**Значения `stage`:** + +| stage | Кто публикует | Когда | +|-----------------|---------------|--------------------------------| +| `queued` | watcher | Задача отправлена в RabbitMQ | +| `transcribing` | transcribe | STT начат / ошибка Nexara | +| `analysing` | analyse | LLM-анализ начат / ошибка | +| `tagging` | tagging | Классификация начата / ошибка | +| `completed` | analyse/tagging | Оба воркера завершились | + +**Типичная последовательность:** + +``` +pending/queued + → in_progress/transcribing + → in_progress/analysing (параллельно) + → in_progress/tagging (параллельно) + → done/completed +``` + +> Браузер не может подписаться на RabbitMQ напрямую. Нужен backend-consumer, который читает `pipeline.status` и отдаёт события через REST или WebSocket. ### Dead Letter @@ -254,7 +305,7 @@ CREATE TABLE results ( analysis JSONB, -- результат analyse (Yandex LLM) tagging JSONB, -- результат tagging (Yandex LLM) metadata JSONB, -- file_path, segments, prompts, language, transcribed_at - status TEXT DEFAULT 'pending', -- pending | done + status TEXT NOT NULL DEFAULT 'pending', -- pending | in_progress | done | error created_at TIMESTAMPTZ, updated_at TIMESTAMPTZ ); @@ -269,7 +320,7 @@ CREATE TABLE results ( | `filename` | оба | При сохранении своей части | | `transcription` | analyse | При сохранении analysis | | `metadata` | analyse | file_path, segments, prompts… | -| `status` | оба | `done` когда оба поля заполнены| +| `status` | оба | `in_progress` при старте; `done` когда оба поля заполнены; `error` при сбое | ### Просмотр данных @@ -484,6 +535,7 @@ OUTPUT_EXCHANGE=transcription_done ANALYSE_QUEUE=analyse TAGGING_QUEUE=tagging FINAL_QUEUE=final +STATUS_QUEUE=pipeline.status PREFETCH=1 # Nexara (STT) @@ -560,6 +612,35 @@ docker compose logs -f watcher transcribe analyse tagging UI: http://localhost:15672 → Queues → `final` +### Развёртывание в k3s + +Полная инструкция: **[k8s/README.md](k8s/README.md)** + +Кратко: + +```bash +# 1. секреты из .env +./k8s/prepare-secret.sh + +# 2. каталог хранилища на ноде k3s +sudo mkdir -p /var/lib/audio-pipeline/storage/{incoming,processing,failed} + +# 3. сборка образов и импорт в k3s +./k8s/build-images.sh + +# 4. деплой +kubectl apply -k k8s/ + +# 5. проверка +kubectl -n audio-pipeline get pods +``` + +Загрузка файла на ноду: + +```bash +sudo cp recording.wav /var/lib/audio-pipeline/storage/incoming/ +``` + --- ## Логирование diff --git a/db/init.sql b/db/init.sql index 00630da..be3a640 100644 --- a/db/init.sql +++ b/db/init.sql @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS results ( analysis JSONB, tagging JSONB, metadata JSONB, - status TEXT NOT NULL DEFAULT 'pending', + status TEXT NOT NULL DEFAULT 'pending', -- pending | in_progress | done | error created_at TIMESTAMPTZ NOT NULL DEFAULT now(), updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); diff --git a/k8s/README.md b/k8s/README.md new file mode 100644 index 0000000..0a50b16 --- /dev/null +++ b/k8s/README.md @@ -0,0 +1,220 @@ +# Развёртывание audio-pipeline в k3s + +Пошаговая инструкция для однонодового k3s. Все сервисы из `docker-compose.yml` переносятся в namespace `audio-pipeline`. + +## Архитектура в кластере + +``` +namespace: audio-pipeline +├── rabbit (Service :5672, :15672) +├── postgres (Service :5432, PVC local-path) +├── PVC audio-storage → hostPath /var/lib/audio-pipeline/storage +├── watcher +├── transcribe (+ ConfigMap prompts.json) +├── tagging +└── analyse +``` + +DNS внутри кластера: `rabbit`, `postgres` — те же хосты, что в `.env` для Docker Compose. + +## Требования + +- Linux-сервер с k3s +- `kubectl` (обычно `/usr/local/bin/kubectl` или `k3s kubectl`) +- Docker (для сборки образов) или свой container registry + +## 1. Установка k3s + +```bash +curl -sfL https://get.k3s.io | sh - +sudo k3s kubectl get nodes +``` + +Для доступа без `sudo`: + +```bash +mkdir -p ~/.kube +sudo k3s kubectl config view --raw > ~/.kube/config +chmod 600 ~/.kube/config +``` + +## 2. Подготовка секретов + +```bash +cd audio-pipeline + +# из корневого .env +./k8s/prepare-secret.sh + +# или вручную +cp k8s/secret.env.example k8s/secret.env +# отредактируйте ключи Nexara / Yandex +``` + +Файл `k8s/secret.env` в git не коммитится. + +Проверьте URL в секрете: + +```env +RABBITMQ_URL=amqp://admin:secret123@rabbit:5672/ +DATABASE_URL=postgres://pipeline:pipeline_secret@postgres:5432/pipeline?sslmode=disable +``` + +## 3. Сборка и загрузка образов + +### Вариант A — локальный k3s (без registry) + +```bash +chmod +x k8s/build-images.sh +./k8s/build-images.sh +``` + +Скрипт собирает 4 образа и импортирует их в containerd k3s. + +### Вариант B — через registry + +```bash +REGISTRY=registry.example.com/audio-pipeline +TAG=v1 + +docker build -t $REGISTRY/watcher:$TAG ./watcher +docker build -t $REGISTRY/transcribe:$TAG ./workers/transcribe +docker build -t $REGISTRY/tagging:$TAG ./workers/tagging +docker build -t $REGISTRY/analyse:$TAG ./workers/analyse + +docker push $REGISTRY/watcher:$TAG +# ... остальные + +# в k8s/watcher.yaml и др. замените image: на $REGISTRY/... +``` + +## 4. Хранилище аудио + +По умолчанию используется **hostPath** на ноде: + +``` +/var/lib/audio-pipeline/storage/ +├── incoming/ +├── processing/ +└── failed/ +``` + +Создайте каталоги на ноде k3s: + +```bash +sudo mkdir -p /var/lib/audio-pipeline/storage/{incoming,processing,failed} +sudo chmod -R 777 /var/lib/audio-pipeline/storage # или нужный uid подов +``` + +> **Важно:** `ReadWriteMany` + hostPath работает, пока все поды на **одной** ноде. Для multi-node кластера подключите NFS или Longhorn с RWX. + +## 5. Деплой + +```bash +kubectl apply -k k8s/ +``` + +Проверка: + +```bash +kubectl -n audio-pipeline get pods +kubectl -n audio-pipeline get pvc +kubectl -n audio-pipeline logs -f deploy/watcher +``` + +Ожидаемый порядок старта: `rabbit` + `postgres` → воркеры (сами ждут RabbitMQ/Postgres при старте). + +## 6. Загрузка тестового файла + +На ноде k3s: + +```bash +sudo cp recording.wav /var/lib/audio-pipeline/storage/incoming/ +``` + +Или с машины разработчика (замените `NODE` на IP сервера): + +```bash +scp recording.wav user@NODE:/tmp/ +ssh user@NODE 'sudo cp /tmp/recording.wav /var/lib/audio-pipeline/storage/incoming/' +``` + +## 7. Мониторинг + +```bash +# логи воркеров +kubectl -n audio-pipeline logs -f deploy/transcribe +kubectl -n audio-pipeline logs -f deploy/analyse +kubectl -n audio-pipeline logs -f deploy/tagging + +# RabbitMQ Management UI (port-forward) +kubectl -n audio-pipeline port-forward svc/rabbit 15672:15672 +# http://localhost:15672 (логин из secret.env) + +# Postgres +kubectl -n audio-pipeline exec -it deploy/postgres -- \ + psql -U pipeline -d pipeline -c "SELECT task_id, status, updated_at FROM results ORDER BY updated_at DESC LIMIT 5;" +``` + +## 8. Обновление + +После изменения кода: + +```bash +./k8s/build-images.sh +kubectl -n audio-pipeline rollout restart deploy/watcher deploy/transcribe deploy/tagging deploy/analyse +``` + +После смены `YANDEX_API_KEY` / `NEXARA_API_KEY`: + +```bash +./k8s/prepare-secret.sh +kubectl apply -k k8s/ +kubectl -n audio-pipeline rollout restart deploy/tagging deploy/analyse +``` + +После смены `prompts.json`: + +```bash +kubectl apply -k k8s/ +kubectl -n audio-pipeline rollout restart deploy/transcribe +``` + +## 9. Удаление + +```bash +kubectl delete -k k8s/ +# данные postgres (PVC) и hostPath останутся — удалите вручную при необходимости +``` + +## Отличия от Docker Compose + +| Compose | k3s | +|---------|-----| +| `env_file: .env` | ConfigMap + Secret | +| volume `./storage` | PVC `audio-storage` (hostPath) | +| `DOTENV_PATH` mount для hot-reload | переменные из Secret; после смены — `rollout restart` | +| `docker compose up --build` | `build-images.sh` + `kubectl apply -k` | +| порты 5672/5432 на хосте | только внутри кластера; снаружи — `port-forward` или Ingress | + +## Опционально: NodePort для RabbitMQ UI + +Добавьте в `k8s/rabbitmq.yaml` в Service: + +```yaml +type: NodePort +# ports: +# - name: management +# port: 15672 +# nodePort: 31672 +``` + +## Troubleshooting + +| Симптом | Решение | +|---------|---------| +| `ImagePullBackOff` | Запустите `./k8s/build-images.sh` или укажите registry | +| PVC `audio-storage` Pending | Создайте PV hostPath (`storage.yaml`) и каталог на ноде | +| watcher не видит файлы | Проверьте mount `/data/storage` и права на hostPath | +| tagging/analyse `YANDEX_API_KEY is required` | Проверьте `secret.env` и `kubectl apply -k k8s/` | +| postgres CrashLoop | Удалите PVC и передеплойте (init.sql только при первом старте) | diff --git a/k8s/analyse.yaml b/k8s/analyse.yaml new file mode 100644 index 0000000..fc75449 --- /dev/null +++ b/k8s/analyse.yaml @@ -0,0 +1,31 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: analyse + namespace: audio-pipeline +spec: + replicas: 1 + selector: + matchLabels: + app: analyse + template: + metadata: + labels: + app: analyse + spec: + containers: + - name: analyse + image: audio-pipeline/analyse:latest + imagePullPolicy: IfNotPresent + envFrom: + - configMapRef: + name: app-config + - secretRef: + name: app-secrets + volumeMounts: + - name: storage + mountPath: /data/storage + volumes: + - name: storage + persistentVolumeClaim: + claimName: audio-storage diff --git a/k8s/build-images.sh b/k8s/build-images.sh new file mode 100755 index 0000000..601f056 --- /dev/null +++ b/k8s/build-images.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +# Сборка образов и импорт в k3s (без внешнего registry). +set -euo pipefail + +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +TAG="${TAG:-latest}" + +build() { + local name=$1 dir=$2 + echo "==> building audio-pipeline/${name}:${TAG}" + docker build -t "audio-pipeline/${name}:${TAG}" "${ROOT}/${dir}" +} + +build watcher watcher +build transcribe workers/transcribe +build tagging workers/tagging +build analyse workers/analyse + +if command -v k3s >/dev/null 2>&1; then + echo "==> importing images into k3s containerd" + for name in watcher transcribe tagging analyse; do + docker save "audio-pipeline/${name}:${TAG}" | sudo k3s ctr images import - + done + echo "done" +else + echo "k3s not found — images built locally only" + echo "push to registry or run: docker save ... | sudo k3s ctr images import -" +fi diff --git a/k8s/configmap.yaml b/k8s/configmap.yaml new file mode 100644 index 0000000..8865b3d --- /dev/null +++ b/k8s/configmap.yaml @@ -0,0 +1,31 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: app-config + namespace: audio-pipeline +data: + STORAGE_ROOT: /data/storage + POLL_INTERVAL: 5s + STABLE_WINDOW: 2s + STABLE_CHECKS: "3" + + RABBITMQ_EXCHANGE: audio_pipeline + RABBITMQ_ROUTING_KEY: audio.new + + INPUT_QUEUE: transcribe.tasks + OUTPUT_EXCHANGE: transcription_done + ANALYSE_QUEUE: analyse + TAGGING_QUEUE: tagging + FINAL_QUEUE: final + STATUS_QUEUE: pipeline.status + PREFETCH: "1" + + NEXARA_BASE_URL: https://api.nexara.ru + NEXARA_MODEL: whisper-1 + NEXARA_TIMEOUT: 10m + + PROMPTS_SOURCE: static + PROMPTS_FILE: /app/configs/prompts.json + PROMPTS_SECTION: "1" + + YANDEX_API_URL: https://ai.api.cloud.yandex.net/v1/chat/completions diff --git a/k8s/kustomization.yaml b/k8s/kustomization.yaml new file mode 100644 index 0000000..af9e4ae --- /dev/null +++ b/k8s/kustomization.yaml @@ -0,0 +1,28 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: audio-pipeline + +resources: + - namespace.yaml + - configmap.yaml + - postgres-init-configmap.yaml + - storage.yaml + - rabbitmq.yaml + - postgres.yaml + - watcher.yaml + - transcribe.yaml + - tagging.yaml + - analyse.yaml + +configMapGenerator: + - name: prompts + files: + - prompts.json=../workers/transcribe/configs/prompts.json + +secretGenerator: + - name: app-secrets + envs: + - secret.env + options: + disableNameSuffixHash: true diff --git a/k8s/namespace.yaml b/k8s/namespace.yaml new file mode 100644 index 0000000..3d85c60 --- /dev/null +++ b/k8s/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: audio-pipeline diff --git a/k8s/postgres-init-configmap.yaml b/k8s/postgres-init-configmap.yaml new file mode 100644 index 0000000..5007d66 --- /dev/null +++ b/k8s/postgres-init-configmap.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: postgres-init + namespace: audio-pipeline +data: + init.sql: | + CREATE TABLE IF NOT EXISTS results ( + task_id TEXT PRIMARY KEY, + filename TEXT, + transcription TEXT, + analysis JSONB, + tagging JSONB, + metadata JSONB, + status TEXT NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() + ); + + ALTER TABLE results ADD COLUMN IF NOT EXISTS metadata JSONB; diff --git a/k8s/postgres.yaml b/k8s/postgres.yaml new file mode 100644 index 0000000..9353e44 --- /dev/null +++ b/k8s/postgres.yaml @@ -0,0 +1,84 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: postgres-data + namespace: audio-pipeline +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi + storageClassName: local-path +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: audio-pipeline +spec: + selector: + app: postgres + ports: + - name: postgres + port: 5432 + targetPort: 5432 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + namespace: audio-pipeline +spec: + replicas: 1 + strategy: + type: Recreate + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: postgres:16-alpine + ports: + - containerPort: 5432 + envFrom: + - secretRef: + name: app-secrets + volumeMounts: + - name: data + mountPath: /var/lib/postgresql/data + - name: init + mountPath: /docker-entrypoint-initdb.d + readOnly: true + readinessProbe: + exec: + command: + - pg_isready + - -U + - pipeline + - -d + - pipeline + initialDelaySeconds: 5 + periodSeconds: 5 + livenessProbe: + exec: + command: + - pg_isready + - -U + - pipeline + - -d + - pipeline + initialDelaySeconds: 15 + periodSeconds: 10 + volumes: + - name: data + persistentVolumeClaim: + claimName: postgres-data + - name: init + configMap: + name: postgres-init diff --git a/k8s/prepare-secret.sh b/k8s/prepare-secret.sh new file mode 100755 index 0000000..ca7f8bd --- /dev/null +++ b/k8s/prepare-secret.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Генерирует k8s/secret.env из корневого .env (хосты rabbit/postgres для кластера). +set -euo pipefail + +ROOT="$(cd "$(dirname "$0")/.." && pwd)" +SRC="${ROOT}/.env" +DST="$(dirname "$0")/secret.env" + +if [[ ! -f "$SRC" ]]; then + echo "missing ${SRC}" >&2 + exit 1 +fi + +grep -E '^(RABBITMQ_DEFAULT_USER|RABBITMQ_DEFAULT_PASS|RABBITMQ_URL|NEXARA_API_KEY|POSTGRES_USER|POSTGRES_PASSWORD|POSTGRES_DB|DATABASE_URL|YANDEX_API_KEY|YANDEX_MODEL)=' "$SRC" \ + | sed 's/@rabbitmq:/@rabbit:/g' \ + > "$DST" + +echo "wrote ${DST}" +echo "review DATABASE_URL and RABBITMQ_URL hosts: postgres, rabbit" diff --git a/k8s/rabbitmq.yaml b/k8s/rabbitmq.yaml new file mode 100644 index 0000000..ad5c82d --- /dev/null +++ b/k8s/rabbitmq.yaml @@ -0,0 +1,50 @@ +apiVersion: v1 +kind: Service +metadata: + name: rabbit + namespace: audio-pipeline +spec: + selector: + app: rabbit + ports: + - name: amqp + port: 5672 + targetPort: 5672 + - name: management + port: 15672 + targetPort: 15672 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rabbit + namespace: audio-pipeline +spec: + replicas: 1 + selector: + matchLabels: + app: rabbit + template: + metadata: + labels: + app: rabbit + spec: + containers: + - name: rabbitmq + image: rabbitmq:3-management-alpine + ports: + - containerPort: 5672 + - containerPort: 15672 + envFrom: + - secretRef: + name: app-secrets + readinessProbe: + exec: + command: ["rabbitmq-diagnostics", "-q", "ping"] + initialDelaySeconds: 10 + periodSeconds: 10 + livenessProbe: + exec: + command: ["rabbitmq-diagnostics", "-q", "ping"] + initialDelaySeconds: 30 + periodSeconds: 30 diff --git a/k8s/secret.env.example b/k8s/secret.env.example new file mode 100644 index 0000000..74061a7 --- /dev/null +++ b/k8s/secret.env.example @@ -0,0 +1,18 @@ +# Скопируйте в secret.env и подставьте реальные значения. +# cp secret.env.example secret.env +# +# Важно: хосты — имена Service в кластере (rabbit, postgres). + +RABBITMQ_DEFAULT_USER=admin +RABBITMQ_DEFAULT_PASS=secret123 +RABBITMQ_URL=amqp://admin:secret123@rabbit:5672/ + +NEXARA_API_KEY=replace-me + +POSTGRES_USER=pipeline +POSTGRES_PASSWORD=pipeline_secret +POSTGRES_DB=pipeline +DATABASE_URL=postgres://pipeline:pipeline_secret@postgres:5432/pipeline?sslmode=disable + +YANDEX_API_KEY=replace-me +YANDEX_MODEL=gpt://folder_id/model_name diff --git a/k8s/storage.yaml b/k8s/storage.yaml new file mode 100644 index 0000000..aeefef3 --- /dev/null +++ b/k8s/storage.yaml @@ -0,0 +1,31 @@ +# Общее хранилище аудио для watcher / transcribe / tagging / analyse. +# На однонодовом k3s hostPath — самый простой вариант (все поды на одной ноде). +# Для кластера из нескольких нод нужен NFS/Longhorn с ReadWriteMany. +apiVersion: v1 +kind: PersistentVolume +metadata: + name: audio-pipeline-storage +spec: + capacity: + storage: 20Gi + accessModes: + - ReadWriteMany + persistentVolumeReclaimPolicy: Retain + storageClassName: manual + hostPath: + path: /var/lib/audio-pipeline/storage + type: DirectoryOrCreate +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: audio-storage + namespace: audio-pipeline +spec: + accessModes: + - ReadWriteMany + storageClassName: manual + resources: + requests: + storage: 20Gi + volumeName: audio-pipeline-storage diff --git a/k8s/tagging.yaml b/k8s/tagging.yaml new file mode 100644 index 0000000..cdbc252 --- /dev/null +++ b/k8s/tagging.yaml @@ -0,0 +1,31 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tagging + namespace: audio-pipeline +spec: + replicas: 1 + selector: + matchLabels: + app: tagging + template: + metadata: + labels: + app: tagging + spec: + containers: + - name: tagging + image: audio-pipeline/tagging:latest + imagePullPolicy: IfNotPresent + envFrom: + - configMapRef: + name: app-config + - secretRef: + name: app-secrets + volumeMounts: + - name: storage + mountPath: /data/storage + volumes: + - name: storage + persistentVolumeClaim: + claimName: audio-storage diff --git a/k8s/transcribe.yaml b/k8s/transcribe.yaml new file mode 100644 index 0000000..19d8e26 --- /dev/null +++ b/k8s/transcribe.yaml @@ -0,0 +1,37 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: transcribe + namespace: audio-pipeline +spec: + replicas: 1 + selector: + matchLabels: + app: transcribe + template: + metadata: + labels: + app: transcribe + spec: + containers: + - name: transcribe + image: audio-pipeline/transcribe:latest + imagePullPolicy: IfNotPresent + envFrom: + - configMapRef: + name: app-config + - secretRef: + name: app-secrets + volumeMounts: + - name: storage + mountPath: /data/storage + - name: prompts + mountPath: /app/configs + readOnly: true + volumes: + - name: storage + persistentVolumeClaim: + claimName: audio-storage + - name: prompts + configMap: + name: prompts diff --git a/k8s/watcher.yaml b/k8s/watcher.yaml new file mode 100644 index 0000000..32eb2c4 --- /dev/null +++ b/k8s/watcher.yaml @@ -0,0 +1,31 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: watcher + namespace: audio-pipeline +spec: + replicas: 1 + selector: + matchLabels: + app: watcher + template: + metadata: + labels: + app: watcher + spec: + containers: + - name: watcher + image: audio-pipeline/watcher:latest + imagePullPolicy: IfNotPresent + envFrom: + - configMapRef: + name: app-config + - secretRef: + name: app-secrets + volumeMounts: + - name: storage + mountPath: /data/storage + volumes: + - name: storage + persistentVolumeClaim: + claimName: audio-storage diff --git a/watcher/cmd/watcher/main.go b/watcher/cmd/watcher/main.go index 7cad1b2..bc4ae32 100644 --- a/watcher/cmd/watcher/main.go +++ b/watcher/cmd/watcher/main.go @@ -11,6 +11,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" "github.com/postmet/watcher/internal/config" + "github.com/postmet/watcher/internal/pipelinestatus" "github.com/postmet/watcher/internal/publisher" "github.com/postmet/watcher/internal/scanner" ) @@ -42,6 +43,10 @@ func main() { slog.Error("publisher init failed", "error", err) os.Exit(1) } + if err := pipelinestatus.DeclareQueue(ch, cfg.StatusQueue); err != nil { + slog.Error("declare status queue failed", "queue", cfg.StatusQueue, "error", err) + os.Exit(1) + } ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() @@ -51,6 +56,7 @@ func main() { "poll_interval", cfg.PollInterval.String(), "exchange", cfg.Exchange, "routing_key", cfg.RoutingKey, + "status_queue", cfg.StatusQueue, ) ticker := time.NewTicker(cfg.PollInterval) @@ -85,6 +91,14 @@ func main() { } continue } + if err := pipelinestatus.Publish(pubCtx, ch, cfg.StatusQueue, pipelinestatus.Event{ + TaskID: cf.TaskID, + Filename: cf.Filename, + Status: pipelinestatus.StatusPending, + Stage: pipelinestatus.StageQueued, + }); err != nil { + slog.Warn("status publish failed", "task_id", cf.TaskID, "error", err) + } slog.Info("task published", "task_id", cf.TaskID, "filename", cf.Filename) } } diff --git a/watcher/internal/config/config.go b/watcher/internal/config/config.go index c1f3d44..c673814 100644 --- a/watcher/internal/config/config.go +++ b/watcher/internal/config/config.go @@ -17,6 +17,7 @@ type Config struct { RabbitURL string Exchange string RoutingKey string + StatusQueue string } func Load() Config { @@ -31,6 +32,7 @@ func Load() Config { RabbitURL: getEnv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/"), Exchange: getEnv("RABBITMQ_EXCHANGE", "audio_pipeline"), RoutingKey: getEnv("RABBITMQ_ROUTING_KEY", "audio.new"), + StatusQueue: getEnv("STATUS_QUEUE", "pipeline.status"), } } diff --git a/watcher/internal/config/config_test.go b/watcher/internal/config/config_test.go new file mode 100644 index 0000000..0e49799 --- /dev/null +++ b/watcher/internal/config/config_test.go @@ -0,0 +1,47 @@ +package config + +import ( + "testing" + "time" +) + +func TestLoadDefaults(t *testing.T) { + t.Setenv("STORAGE_ROOT", "") + t.Setenv("RABBITMQ_URL", "") + t.Setenv("STATUS_QUEUE", "") + + cfg := Load() + if cfg.StorageRoot != "/data/storage" { + t.Fatalf("StorageRoot: got %q", cfg.StorageRoot) + } + if cfg.StatusQueue != "pipeline.status" { + t.Fatalf("StatusQueue: got %q", cfg.StatusQueue) + } + if cfg.PollInterval != 5*time.Second { + t.Fatalf("PollInterval: got %v", cfg.PollInterval) + } + if cfg.StableChecks != 3 { + t.Fatalf("StableChecks: got %d", cfg.StableChecks) + } +} + +func TestLoadFromEnv(t *testing.T) { + t.Setenv("STORAGE_ROOT", "/tmp/storage") + t.Setenv("POLL_INTERVAL", "10s") + t.Setenv("STABLE_CHECKS", "5") + t.Setenv("STATUS_QUEUE", "custom.status") + + cfg := Load() + if cfg.StorageRoot != "/tmp/storage" { + t.Fatalf("StorageRoot: got %q", cfg.StorageRoot) + } + if cfg.PollInterval != 10*time.Second { + t.Fatalf("PollInterval: got %v", cfg.PollInterval) + } + if cfg.StableChecks != 5 { + t.Fatalf("StableChecks: got %d", cfg.StableChecks) + } + if cfg.StatusQueue != "custom.status" { + t.Fatalf("StatusQueue: got %q", cfg.StatusQueue) + } +} diff --git a/watcher/internal/pipelinestatus/status.go b/watcher/internal/pipelinestatus/status.go new file mode 100644 index 0000000..b4b7b79 --- /dev/null +++ b/watcher/internal/pipelinestatus/status.go @@ -0,0 +1,57 @@ +package pipelinestatus + +import ( + "context" + "encoding/json" + "fmt" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + StatusPending = "pending" + StatusInProgress = "in_progress" + StatusDone = "done" + StatusError = "error" +) + +const ( + StageQueued = "queued" + StageTranscribing = "transcribing" + StageAnalysing = "analysing" + StageTagging = "tagging" + StageCompleted = "completed" +) + +type Event struct { + TaskID string `json:"task_id"` + Filename string `json:"filename,omitempty"` + Status string `json:"status"` + Stage string `json:"stage"` + Error string `json:"error,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +func DeclareQueue(ch *amqp.Channel, queue string) error { + _, err := ch.QueueDeclare(queue, true, false, false, false, nil) + return err +} + +func Publish(ctx context.Context, ch *amqp.Channel, queue string, ev Event) error { + if ev.Timestamp == 0 { + ev.Timestamp = time.Now().Unix() + } + body, err := json.Marshal(ev) + if err != nil { + return err + } + if err := ch.PublishWithContext(ctx, "", queue, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + return fmt.Errorf("publish status: %w", err) + } + return nil +} diff --git a/watcher/internal/pipelinestatus/status_test.go b/watcher/internal/pipelinestatus/status_test.go new file mode 100644 index 0000000..53bab0f --- /dev/null +++ b/watcher/internal/pipelinestatus/status_test.go @@ -0,0 +1,65 @@ +package pipelinestatus + +import ( + "encoding/json" + "testing" +) + +func TestEventJSON(t *testing.T) { + ev := Event{ + TaskID: "01HXABCDEF", + Filename: "call.wav", + Status: StatusPending, + Stage: StageQueued, + Timestamp: 1717843200, + } + body, err := json.Marshal(ev) + if err != nil { + t.Fatal(err) + } + + var got Event + if err := json.Unmarshal(body, &got); err != nil { + t.Fatal(err) + } + if got.TaskID != ev.TaskID || got.Status != StatusPending || got.Stage != StageQueued { + t.Fatalf("unexpected event: %+v", got) + } + if got.Error != "" { + t.Fatalf("expected empty error, got %q", got.Error) + } +} + +func TestEventJSONWithError(t *testing.T) { + ev := Event{ + TaskID: "01HX", + Status: StatusError, + Stage: StageTranscribing, + Error: "nexara timeout", + } + body, err := json.Marshal(ev) + if err != nil { + t.Fatal(err) + } + if !json.Valid(body) { + t.Fatal("invalid json") + } + var got Event + if err := json.Unmarshal(body, &got); err != nil { + t.Fatal(err) + } + if got.Error != "nexara timeout" { + t.Fatalf("error field: got %q", got.Error) + } +} + +func TestStatusConstants(t *testing.T) { + statuses := []string{StatusPending, StatusInProgress, StatusDone, StatusError} + seen := make(map[string]bool) + for _, s := range statuses { + if seen[s] { + t.Fatalf("duplicate status %q", s) + } + seen[s] = true + } +} diff --git a/watcher/internal/publisher/publisher_test.go b/watcher/internal/publisher/publisher_test.go new file mode 100644 index 0000000..4be6b51 --- /dev/null +++ b/watcher/internal/publisher/publisher_test.go @@ -0,0 +1,43 @@ +package publisher + +import ( + "encoding/json" + "testing" + "time" +) + +func TestAudioTaskJSON(t *testing.T) { + task := AudioTask{ + TaskID: "01HXABCDEF", + FilePath: "/data/storage/processing/01HX.wav", + Filename: "call.wav", + Size: 1024, + CreatedAt: 1717843200, + } + body, err := json.Marshal(task) + if err != nil { + t.Fatal(err) + } + + var got AudioTask + if err := json.Unmarshal(body, &got); err != nil { + t.Fatal(err) + } + if got.TaskID != task.TaskID || got.FilePath != task.FilePath || got.Size != 1024 { + t.Fatalf("unexpected task: %+v", got) + } +} + +func TestAudioTaskCreatedAtDefault(t *testing.T) { + task := AudioTask{TaskID: "x", Filename: "a.wav"} + if task.CreatedAt != 0 { + t.Fatal("zero value should have CreatedAt=0 before publish") + } + before := time.Now().Unix() + if task.CreatedAt == 0 { + task.CreatedAt = time.Now().Unix() + } + if task.CreatedAt < before { + t.Fatal("timestamp should be set") + } +} diff --git a/watcher/internal/scanner/scanner_test.go b/watcher/internal/scanner/scanner_test.go new file mode 100644 index 0000000..60fad2c --- /dev/null +++ b/watcher/internal/scanner/scanner_test.go @@ -0,0 +1,162 @@ +package scanner + +import ( + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func testScanner(t *testing.T) (*Scanner, string) { + t.Helper() + root := t.TempDir() + s := New(Config{ + StorageRoot: root, + IncomingDir: "incoming", + ProcessingDir: "processing", + FailedDir: "failed", + StableWindow: time.Millisecond, + StableChecks: 1, + }) + if err := s.EnsureDirs(); err != nil { + t.Fatalf("EnsureDirs: %v", err) + } + return s, root +} + +func TestEnsureDirs(t *testing.T) { + _, root := testScanner(t) + for _, dir := range []string{"incoming", "processing", "failed"} { + p := filepath.Join(root, dir) + if st, err := os.Stat(p); err != nil || !st.IsDir() { + t.Fatalf("dir %s missing: %v", dir, err) + } + } +} + +func TestScanOnceClaimsStableAudio(t *testing.T) { + s, root := testScanner(t) + incoming := filepath.Join(root, "incoming") + if err := os.WriteFile(filepath.Join(incoming, "call.wav"), []byte("audio"), 0o644); err != nil { + t.Fatal(err) + } + + claimed, err := s.ScanOnce() + if err != nil { + t.Fatal(err) + } + if len(claimed) != 1 { + t.Fatalf("claimed %d files, want 1", len(claimed)) + } + cf := claimed[0] + if cf.Filename != "call.wav" { + t.Fatalf("filename: got %q", cf.Filename) + } + if cf.TaskID == "" { + t.Fatal("empty task_id") + } + if !strings.HasPrefix(cf.FilePath, filepath.Join(root, "processing")) { + t.Fatalf("unexpected path: %s", cf.FilePath) + } + if _, err := os.Stat(cf.FilePath); err != nil { + t.Fatalf("claimed file missing: %v", err) + } + if _, err := os.Stat(filepath.Join(incoming, "call.wav")); !os.IsNotExist(err) { + t.Fatal("source file should be moved") + } +} + +func TestScanOnceSkipsUnsupportedAndHidden(t *testing.T) { + s, root := testScanner(t) + incoming := filepath.Join(root, "incoming") + files := map[string][]byte{ + ".hidden.wav": []byte("x"), + "notes.txt": []byte("x"), + "upload.tmp": []byte("x"), + "valid.mp3": []byte("audio"), + } + for name, data := range files { + if err := os.WriteFile(filepath.Join(incoming, name), data, 0o644); err != nil { + t.Fatal(err) + } + } + + claimed, err := s.ScanOnce() + if err != nil { + t.Fatal(err) + } + if len(claimed) != 1 { + t.Fatalf("claimed %d files, want 1 (valid.mp3)", len(claimed)) + } + if claimed[0].Filename != "valid.mp3" { + t.Fatalf("filename: got %q", claimed[0].Filename) + } +} + +func TestScanOnceEmptyIncoming(t *testing.T) { + s, _ := testScanner(t) + claimed, err := s.ScanOnce() + if err != nil { + t.Fatal(err) + } + if len(claimed) != 0 { + t.Fatalf("expected 0 claimed, got %d", len(claimed)) + } +} + +func TestRollbackToIncoming(t *testing.T) { + s, root := testScanner(t) + processing := filepath.Join(root, "processing") + incoming := filepath.Join(root, "incoming") + processingFile := filepath.Join(processing, "01TASK.wav") + if err := os.WriteFile(processingFile, []byte("data"), 0o644); err != nil { + t.Fatal(err) + } + + if err := s.RollbackToIncoming(processingFile, "call.wav"); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(incoming, "call.wav")); err != nil { + t.Fatalf("file not restored to incoming: %v", err) + } + if _, err := os.Stat(processingFile); !os.IsNotExist(err) { + t.Fatal("processing file should be gone") + } +} + +func TestMoveToFailed(t *testing.T) { + s, root := testScanner(t) + processing := filepath.Join(root, "processing") + processingFile := filepath.Join(processing, "broken.wav") + if err := os.WriteFile(processingFile, []byte("data"), 0o644); err != nil { + t.Fatal(err) + } + + if err := s.MoveToFailed(processingFile, "broken.wav"); err != nil { + t.Fatal(err) + } + failedPath := filepath.Join(root, "failed", "broken.wav") + if _, err := os.Stat(failedPath); err != nil { + t.Fatalf("file not in failed: %v", err) + } +} + +func TestIsStableStableFile(t *testing.T) { + s, root := testScanner(t) + path := filepath.Join(root, "incoming", "stable.wav") + if err := os.WriteFile(path, []byte("fixed-content"), 0o644); err != nil { + t.Fatal(err) + } + if !s.isStable(path) { + t.Fatal("expected stable file") + } +} + +func TestIsStableMissingFile(t *testing.T) { + s, root := testScanner(t) + path := filepath.Join(root, "incoming", "missing.wav") + if s.isStable(path) { + t.Fatal("missing file should not be stable") + } +} diff --git a/workers/analyse/cmd/analyse/main.go b/workers/analyse/cmd/analyse/main.go index 4af726f..7ccfe57 100644 --- a/workers/analyse/cmd/analyse/main.go +++ b/workers/analyse/cmd/analyse/main.go @@ -18,6 +18,8 @@ import ( "github.com/joho/godotenv" _ "github.com/jackc/pgx/v5/stdlib" amqp "github.com/rabbitmq/amqp091-go" + + "github.com/postmet/analyse/internal/pipelinestatus" ) func init() { @@ -326,7 +328,7 @@ func saveAnalysis(ctx context.Context, db *sql.DB, task WorkerMessage, analysis transcription = COALESCE(NULLIF($4, ''), transcription), metadata = COALESCE($5::jsonb, metadata), updated_at = now(), - status = CASE WHEN tagging IS NOT NULL THEN 'done' ELSE status END + status = CASE WHEN tagging IS NOT NULL THEN 'done' ELSE 'in_progress' END WHERE task_id = $1 RETURNING (analysis IS NOT NULL AND tagging IS NOT NULL) `, task.TaskID, string(analysis), task.Filename, task.Transcription, string(metadata)).Scan(&complete) @@ -336,6 +338,41 @@ func saveAnalysis(ctx context.Context, db *sql.DB, task WorkerMessage, analysis return complete, nil } +func setTaskInProgress(ctx context.Context, db *sql.DB, taskID, filename string) error { + _, err := db.ExecContext(ctx, ` + INSERT INTO results (task_id, filename, status) + VALUES ($1, NULLIF($2, ''), 'in_progress') + ON CONFLICT (task_id) DO UPDATE SET + status = 'in_progress', + filename = COALESCE(NULLIF(EXCLUDED.filename, ''), results.filename), + updated_at = now() + `, taskID, filename) + return err +} + +func setTaskError(ctx context.Context, db *sql.DB, taskID string) error { + _, err := db.ExecContext(ctx, ` + INSERT INTO results (task_id, status) VALUES ($1, 'error') + ON CONFLICT (task_id) DO UPDATE SET status = 'error', updated_at = now() + `, taskID) + return err +} + +func publishStatus(ctx context.Context, ch *amqp.Channel, queue, taskID, filename, status, stage, errMsg string) { + ev := pipelinestatus.Event{ + TaskID: taskID, + Filename: filename, + Status: status, + Stage: stage, + } + if errMsg != "" { + ev.Error = errMsg + } + if err := pipelinestatus.Publish(ctx, ch, queue, ev); err != nil { + slog.Warn("status publish failed", "worker", "analyse", "task_id", taskID, "stage", stage, "error", err) + } +} + // ===================== MAIN ===================== func loadDotenv() { @@ -360,6 +397,7 @@ func main() { apiURL := getEnv("YANDEX_API_URL", "https://ai.api.cloud.yandex.net/v1/chat/completions") inputQueue := getEnv("ANALYSE_QUEUE", "analyse") finalQueue := getEnv("FINAL_QUEUE", "final") + statusQueue := getEnv("STATUS_QUEUE", "pipeline.status") if token == "" { slog.Error("YANDEX_API_KEY is required") @@ -397,6 +435,10 @@ func main() { slog.Error("declare queue failed", "queue", finalQueue, "error", err) os.Exit(1) } + if err := pipelinestatus.DeclareQueue(ch, statusQueue); err != nil { + slog.Error("declare status queue failed", "queue", statusQueue, "error", err) + os.Exit(1) + } ch.Qos(1, 0, false) msgs, err := ch.Consume(inputQueue, "", false, false, false, false, nil) @@ -404,7 +446,7 @@ func main() { slog.Error("consume failed", "error", err) os.Exit(1) } - slog.Info("worker started", "worker", "analyse", "queue", inputQueue, "model", model) + slog.Info("worker started", "worker", "analyse", "queue", inputQueue, "status_queue", statusQueue, "model", model) for d := range msgs { taskStart := time.Now() @@ -449,9 +491,18 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + if err := setTaskInProgress(ctx, db, task.TaskID, task.Filename); err != nil { + slog.Warn("set in_progress failed", "worker", "analyse", "task_id", task.TaskID, "error", err) + } + publishStatus(ctx, ch, statusQueue, task.TaskID, task.Filename, + pipelinestatus.StatusInProgress, pipelinestatus.StageAnalysing, "") + result, stats, err := runAnalysis(ctx, apiURL, model, task.TaskID, task.Transcription, task.Prompts) if err != nil { cancel() + _ = setTaskError(context.Background(), db, task.TaskID) + publishStatus(context.Background(), ch, statusQueue, task.TaskID, task.Filename, + pipelinestatus.StatusError, pipelinestatus.StageAnalysing, err.Error()) slog.Warn("task failed, discarded", "worker", "analyse", "task_id", task.TaskID, "llm_calls_done", stats.LLMCalls, @@ -482,7 +533,7 @@ func main() { } if complete { - notifyFinal(ctx, ch, db, finalQueue, task.TaskID, "analyse") + notifyFinal(ctx, ch, db, finalQueue, statusQueue, task.TaskID, task.Filename, "analyse") slog.Info("task complete", append(taskAttrs, "was_last", "analyse")...) } else { slog.Info("task partial", append(taskAttrs, "waiting_for", "tagging")...) @@ -549,12 +600,14 @@ func loadFinalPayload(ctx context.Context, db *sql.DB, taskID string) ([]byte, e return json.Marshal(msg) } -func notifyFinal(ctx context.Context, ch *amqp.Channel, db *sql.DB, queue, taskID, worker string) { +func notifyFinal(ctx context.Context, ch *amqp.Channel, db *sql.DB, queue, statusQueue, taskID, filename, worker string) { body, err := loadFinalPayload(ctx, db, taskID) if err != nil { slog.Warn("load final payload failed", "worker", worker, "task_id", taskID, "error", err) return } + publishStatus(ctx, ch, statusQueue, taskID, filename, + pipelinestatus.StatusDone, pipelinestatus.StageCompleted, "") if err := ch.PublishWithContext(ctx, "", queue, false, false, amqp.Publishing{ ContentType: "application/json", diff --git a/workers/analyse/cmd/analyse/main_test.go b/workers/analyse/cmd/analyse/main_test.go new file mode 100644 index 0000000..aece73e --- /dev/null +++ b/workers/analyse/cmd/analyse/main_test.go @@ -0,0 +1,164 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestTruncate(t *testing.T) { + if got := truncate("short", 10); got != "short" { + t.Fatalf("got %q", got) + } + long := strings.Repeat("a", 20) + got := truncate(long, 10) + if len(got) != 13 || !strings.HasSuffix(got, "...") { + t.Fatalf("got %q", got) + } +} + +func TestTokenFingerprint(t *testing.T) { + if tokenFingerprint("short") != "***" { + t.Fatal("short token should be masked") + } + fp := tokenFingerprint("abcdefghijklmnop") + if !strings.HasPrefix(fp, "abcdefgh") || !strings.HasSuffix(fp, "mnop") { + t.Fatalf("got %q", fp) + } +} + +func TestBuildPromptQuery(t *testing.T) { + p := Prompt{Name: "behavioral", Prompt: "Оцени звонок"} + q := buildPromptQuery("текст транскрипции", p) + if !strings.Contains(q, "Оцени звонок") { + t.Fatal("missing prompt text") + } + if !strings.Contains(q, "текст транскрипции") { + t.Fatal("missing transcription") + } + if !strings.Contains(q, "=== ТРАНСКРИПЦИЯ ===") { + t.Fatal("missing section header") + } +} + +func TestRunAnalysis(t *testing.T) { + t.Setenv("YANDEX_API_KEY", "test-token") + + var calls int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls++ + if auth := r.Header.Get("Authorization"); auth != "Bearer test-token" { + t.Fatalf("auth: got %q", auth) + } + _ = json.NewEncoder(w).Encode(map[string]any{ + "choices": []map[string]any{ + {"message": map[string]any{"content": `{"ok":true}`}}, + }, + "usage": map[string]any{ + "prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15, + }, + }) + })) + defer srv.Close() + + prompts := []Prompt{ + {Name: "behavioral", Prompt: "p1"}, + {Name: "client_data", Prompt: "p2"}, + } + result, stats, err := runAnalysis(context.Background(), srv.URL, "model", "task-1", "транскрипт", prompts) + if err != nil { + t.Fatal(err) + } + if calls != 2 { + t.Fatalf("llm calls: got %d, want 2", calls) + } + if len(result) != 2 { + t.Fatalf("result keys: %d", len(result)) + } + if stats.LLMCalls != 2 || stats.TotalTokens != 30 { + t.Fatalf("stats: %+v", stats) + } +} + +func TestRunAnalysisSkipsEmptyNames(t *testing.T) { + t.Setenv("YANDEX_API_KEY", "k") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]any{ + "choices": []map[string]any{{"message": map[string]any{"content": `{"x":1}`}}}, + }) + })) + defer srv.Close() + + result, stats, err := runAnalysis(context.Background(), srv.URL, "m", "t", "text", []Prompt{ + {Name: "", Prompt: "skip"}, + {Name: "valid", Prompt: "go"}, + }) + if err != nil { + t.Fatal(err) + } + if len(result) != 1 { + t.Fatalf("want 1 result, got %d", len(result)) + } + if stats.LLMCalls != 1 { + t.Fatalf("llm calls: %d", stats.LLMCalls) + } +} + +func TestExtractFilePath(t *testing.T) { + body, _ := json.Marshal(map[string]string{ + "task_id": "01HX", + "file_path": "/data/storage/processing/01HX.wav", + }) + if got := extractFilePath(body); got != "/data/storage/processing/01HX.wav" { + t.Fatalf("got %q", got) + } + if extractFilePath([]byte("not json")) != "" { + t.Fatal("invalid json should return empty") + } +} + +func TestDeleteProcessingFile(t *testing.T) { + dir := t.TempDir() + processingDir := filepath.Join(dir, "processing") + if err := os.MkdirAll(processingDir, 0o755); err != nil { + t.Fatal(err) + } + path := filepath.Join(processingDir, "01TASK.wav") + if err := os.WriteFile(path, []byte("audio"), 0o644); err != nil { + t.Fatal(err) + } + + deleteProcessingFile(path, "01TASK", "analyse") + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Fatal("file should be deleted") + } +} + +func TestDeleteProcessingFileRejectsOutsideProcessing(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "incoming", "file.wav") + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(path, []byte("x"), 0o644); err != nil { + t.Fatal(err) + } + deleteProcessingFile(path, "t", "analyse") + if _, err := os.Stat(path); err != nil { + t.Fatal("file outside processing must not be deleted") + } +} + +func TestAccumulateUsage(t *testing.T) { + stats := &analysisStats{} + accumulateUsage(stats, &llmCallResult{Usage: &tokenUsage{TotalTokens: 10, PromptTokens: 6, CompletionTokens: 4}}) + accumulateUsage(stats, &llmCallResult{Usage: &tokenUsage{TotalTokens: 5, PromptTokens: 3, CompletionTokens: 2}}) + if stats.LLMCalls != 2 || stats.TotalTokens != 15 { + t.Fatalf("stats: %+v", stats) + } +} diff --git a/workers/analyse/internal/pipelinestatus/status.go b/workers/analyse/internal/pipelinestatus/status.go new file mode 100644 index 0000000..b4b7b79 --- /dev/null +++ b/workers/analyse/internal/pipelinestatus/status.go @@ -0,0 +1,57 @@ +package pipelinestatus + +import ( + "context" + "encoding/json" + "fmt" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + StatusPending = "pending" + StatusInProgress = "in_progress" + StatusDone = "done" + StatusError = "error" +) + +const ( + StageQueued = "queued" + StageTranscribing = "transcribing" + StageAnalysing = "analysing" + StageTagging = "tagging" + StageCompleted = "completed" +) + +type Event struct { + TaskID string `json:"task_id"` + Filename string `json:"filename,omitempty"` + Status string `json:"status"` + Stage string `json:"stage"` + Error string `json:"error,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +func DeclareQueue(ch *amqp.Channel, queue string) error { + _, err := ch.QueueDeclare(queue, true, false, false, false, nil) + return err +} + +func Publish(ctx context.Context, ch *amqp.Channel, queue string, ev Event) error { + if ev.Timestamp == 0 { + ev.Timestamp = time.Now().Unix() + } + body, err := json.Marshal(ev) + if err != nil { + return err + } + if err := ch.PublishWithContext(ctx, "", queue, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + return fmt.Errorf("publish status: %w", err) + } + return nil +} diff --git a/workers/tagging/cmd/tagging/main.go b/workers/tagging/cmd/tagging/main.go index 3c8fb65..2242a76 100644 --- a/workers/tagging/cmd/tagging/main.go +++ b/workers/tagging/cmd/tagging/main.go @@ -18,6 +18,8 @@ import ( "github.com/joho/godotenv" _ "github.com/jackc/pgx/v5/stdlib" amqp "github.com/rabbitmq/amqp091-go" + + "github.com/postmet/tagging/internal/pipelinestatus" ) func init() { @@ -381,7 +383,7 @@ func saveTagging(ctx context.Context, db *sql.DB, taskID, filename, transcriptio filename = COALESCE(NULLIF($3, ''), filename), transcription = COALESCE(NULLIF($4, ''), transcription), updated_at = now(), - status = CASE WHEN analysis IS NOT NULL THEN 'done' ELSE status END + status = CASE WHEN analysis IS NOT NULL THEN 'done' ELSE 'in_progress' END WHERE task_id = $1 RETURNING (analysis IS NOT NULL AND tagging IS NOT NULL) `, taskID, string(tagging), filename, transcription).Scan(&complete) @@ -391,6 +393,41 @@ func saveTagging(ctx context.Context, db *sql.DB, taskID, filename, transcriptio return complete, nil } +func setTaskInProgress(ctx context.Context, db *sql.DB, taskID, filename string) error { + _, err := db.ExecContext(ctx, ` + INSERT INTO results (task_id, filename, status) + VALUES ($1, NULLIF($2, ''), 'in_progress') + ON CONFLICT (task_id) DO UPDATE SET + status = 'in_progress', + filename = COALESCE(NULLIF(EXCLUDED.filename, ''), results.filename), + updated_at = now() + `, taskID, filename) + return err +} + +func setTaskError(ctx context.Context, db *sql.DB, taskID string) error { + _, err := db.ExecContext(ctx, ` + INSERT INTO results (task_id, status) VALUES ($1, 'error') + ON CONFLICT (task_id) DO UPDATE SET status = 'error', updated_at = now() + `, taskID) + return err +} + +func publishStatus(ctx context.Context, ch *amqp.Channel, queue, taskID, filename, status, stage, errMsg string) { + ev := pipelinestatus.Event{ + TaskID: taskID, + Filename: filename, + Status: status, + Stage: stage, + } + if errMsg != "" { + ev.Error = errMsg + } + if err := pipelinestatus.Publish(ctx, ch, queue, ev); err != nil { + slog.Warn("status publish failed", "worker", "tagging", "task_id", taskID, "stage", stage, "error", err) + } +} + // ===================== MAIN ===================== func loadDotenv() { @@ -414,6 +451,7 @@ func main() { model := os.Getenv("YANDEX_MODEL") inputQueue := getenv("TAGGING_QUEUE", "tagging") finalQueue := getenv("FINAL_QUEUE", "final") + statusQueue := getenv("STATUS_QUEUE", "pipeline.status") if token == "" { slog.Error("YANDEX_API_KEY is required") @@ -447,6 +485,10 @@ func main() { slog.Error("declare queue failed", "queue", finalQueue, "error", err) os.Exit(1) } + if err := pipelinestatus.DeclareQueue(ch, statusQueue); err != nil { + slog.Error("declare status queue failed", "queue", statusQueue, "error", err) + os.Exit(1) + } ch.Qos(1, 0, false) msgs, err := ch.Consume(inputQueue, "", false, false, false, false, nil) @@ -454,7 +496,7 @@ func main() { slog.Error("consume failed", "error", err) os.Exit(1) } - slog.Info("worker started", "worker", "tagging", "queue", inputQueue, "model", model) + slog.Info("worker started", "worker", "tagging", "queue", inputQueue, "status_queue", statusQueue, "model", model) for d := range msgs { taskStart := time.Now() @@ -485,9 +527,18 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + if err := setTaskInProgress(ctx, db, task.TaskID, task.Filename); err != nil { + slog.Warn("set in_progress failed", "worker", "tagging", "task_id", task.TaskID, "error", err) + } + publishStatus(ctx, ch, statusQueue, task.TaskID, task.Filename, + pipelinestatus.StatusInProgress, pipelinestatus.StageTagging, "") + result, err := classify(ctx, task.TaskID, model, task.Transcription) if err != nil { cancel() + _ = setTaskError(context.Background(), db, task.TaskID) + publishStatus(context.Background(), ch, statusQueue, task.TaskID, task.Filename, + pipelinestatus.StatusError, pipelinestatus.StageTagging, err.Error()) slog.Warn("task failed, discarded", "worker", "tagging", "task_id", task.TaskID, "llm_calls", 1, "error", err) @@ -506,7 +557,7 @@ func main() { } if complete { - notifyFinal(ctx, ch, db, finalQueue, task.TaskID, "tagging") + notifyFinal(ctx, ch, db, finalQueue, statusQueue, task.TaskID, task.Filename, "tagging") slog.Info("task complete", "worker", "tagging", "task_id", task.TaskID, "was_last", "tagging", "L1", result.L1, "llm_calls", 1, "duration_ms", time.Since(taskStart).Milliseconds()) @@ -591,12 +642,14 @@ func loadFinalPayload(ctx context.Context, db *sql.DB, taskID string) ([]byte, e return json.Marshal(msg) } -func notifyFinal(ctx context.Context, ch *amqp.Channel, db *sql.DB, queue, taskID, worker string) { +func notifyFinal(ctx context.Context, ch *amqp.Channel, db *sql.DB, queue, statusQueue, taskID, filename, worker string) { body, err := loadFinalPayload(ctx, db, taskID) if err != nil { slog.Warn("load final payload failed", "worker", worker, "task_id", taskID, "error", err) return } + publishStatus(ctx, ch, statusQueue, taskID, filename, + pipelinestatus.StatusDone, pipelinestatus.StageCompleted, "") if err := ch.PublishWithContext(ctx, "", queue, false, false, amqp.Publishing{ ContentType: "application/json", diff --git a/workers/tagging/cmd/tagging/main_test.go b/workers/tagging/cmd/tagging/main_test.go new file mode 100644 index 0000000..897997e --- /dev/null +++ b/workers/tagging/cmd/tagging/main_test.go @@ -0,0 +1,123 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestTruncate(t *testing.T) { + if got := truncate("ok", 5); got != "ok" { + t.Fatalf("got %q", got) + } + got := truncate(strings.Repeat("x", 30), 5) + if len(got) != 8 { + t.Fatalf("got %q", got) + } +} + +func TestTokenFingerprint(t *testing.T) { + if tokenFingerprint("x") != "***" { + t.Fatal("expected mask for short token") + } +} + +func TestBuildPromptContainsTranscription(t *testing.T) { + text := "клиент спрашивает где груз" + p := buildPrompt(text) + if !strings.Contains(p, text) { + t.Fatal("prompt must include transcription") + } + if !strings.Contains(p, `"L1"`) { + t.Fatal("prompt must describe JSON output") + } +} + +func TestClassify(t *testing.T) { + t.Setenv("YANDEX_API_KEY", "test-key") + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if auth := r.Header.Get("Authorization"); auth != "Bearer test-key" { + t.Fatalf("auth: got %q", auth) + } + _ = json.NewEncoder(w).Encode(map[string]any{ + "choices": []map[string]any{ + {"message": map[string]any{"content": `{ + "L1":"tracking", + "L2":"location_request", + "L3":"", + "risk_level":"low", + "has_action_items":false, + "has_deadline":false + }`}}, + }, + }) + })) + defer srv.Close() + + t.Setenv("YANDEX_API_URL", srv.URL) + + result, err := classify(context.Background(), "task-1", "model", "где мой груз") + if err != nil { + t.Fatal(err) + } + if result.L1 != "tracking" || result.L2 != "location_request" { + t.Fatalf("unexpected result: %+v", result) + } + if result.RiskLevel != "low" { + t.Fatalf("risk_level: %q", result.RiskLevel) + } +} + +func TestClassifyInvalidJSON(t *testing.T) { + t.Setenv("YANDEX_API_KEY", "k") + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _ = json.NewEncoder(w).Encode(map[string]any{ + "choices": []map[string]any{{"message": map[string]any{"content": "not-json"}}}, + }) + })) + defer srv.Close() + t.Setenv("YANDEX_API_URL", srv.URL) + + _, err := classify(context.Background(), "t", "m", "text") + if err == nil { + t.Fatal("expected parse error") + } +} + +func TestExtractFilePath(t *testing.T) { + body := []byte(`{"file_path":"/data/storage/processing/x.wav"}`) + if got := extractFilePath(body); got != "/data/storage/processing/x.wav" { + t.Fatalf("got %q", got) + } +} + +func TestDeleteProcessingFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "processing", "01.wav") + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(path, []byte("a"), 0o644); err != nil { + t.Fatal(err) + } + deleteProcessingFile(path, "01", "tagging") + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Fatal("expected file removed") + } +} + +func TestGetenv(t *testing.T) { + t.Setenv("TAGGING_QUEUE", "custom") + if got := getenv("TAGGING_QUEUE", "tagging"); got != "custom" { + t.Fatalf("got %q", got) + } + if got := getenv("UNSET_VAR_XYZ", "default"); got != "default" { + t.Fatalf("got %q", got) + } +} diff --git a/workers/tagging/internal/pipelinestatus/status.go b/workers/tagging/internal/pipelinestatus/status.go new file mode 100644 index 0000000..b4b7b79 --- /dev/null +++ b/workers/tagging/internal/pipelinestatus/status.go @@ -0,0 +1,57 @@ +package pipelinestatus + +import ( + "context" + "encoding/json" + "fmt" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + StatusPending = "pending" + StatusInProgress = "in_progress" + StatusDone = "done" + StatusError = "error" +) + +const ( + StageQueued = "queued" + StageTranscribing = "transcribing" + StageAnalysing = "analysing" + StageTagging = "tagging" + StageCompleted = "completed" +) + +type Event struct { + TaskID string `json:"task_id"` + Filename string `json:"filename,omitempty"` + Status string `json:"status"` + Stage string `json:"stage"` + Error string `json:"error,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +func DeclareQueue(ch *amqp.Channel, queue string) error { + _, err := ch.QueueDeclare(queue, true, false, false, false, nil) + return err +} + +func Publish(ctx context.Context, ch *amqp.Channel, queue string, ev Event) error { + if ev.Timestamp == 0 { + ev.Timestamp = time.Now().Unix() + } + body, err := json.Marshal(ev) + if err != nil { + return err + } + if err := ch.PublishWithContext(ctx, "", queue, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + return fmt.Errorf("publish status: %w", err) + } + return nil +} diff --git a/workers/transcribe/internal/config/config.go b/workers/transcribe/internal/config/config.go index 87a3096..cf27ecd 100644 --- a/workers/transcribe/internal/config/config.go +++ b/workers/transcribe/internal/config/config.go @@ -15,6 +15,7 @@ type Config struct { InputExchange string InputRoutingKey string Prefetch int + StatusQueue string NexaraBaseURL string NexaraAPIKey string @@ -38,6 +39,7 @@ func Load() Config { InputExchange: getEnv("RABBITMQ_EXCHANGE", "audio_pipeline"), InputRoutingKey: getEnv("RABBITMQ_ROUTING_KEY", "audio.new"), Prefetch: getInt("PREFETCH", 1), + StatusQueue: getEnv("STATUS_QUEUE", "pipeline.status"), NexaraBaseURL: getEnv("NEXARA_BASE_URL", "https://api.nexara.ru"), NexaraAPIKey: os.Getenv("NEXARA_API_KEY"), diff --git a/workers/transcribe/internal/config/config_test.go b/workers/transcribe/internal/config/config_test.go new file mode 100644 index 0000000..385045f --- /dev/null +++ b/workers/transcribe/internal/config/config_test.go @@ -0,0 +1,51 @@ +package config + +import ( + "testing" + "time" +) + +func TestLoadDefaults(t *testing.T) { + t.Setenv("RABBITMQ_URL", "") + t.Setenv("STATUS_QUEUE", "") + t.Setenv("NEXARA_TIMEOUT", "") + + cfg := Load() + if cfg.InputQueue != "transcribe.tasks" { + t.Fatalf("InputQueue: got %q", cfg.InputQueue) + } + if cfg.StatusQueue != "pipeline.status" { + t.Fatalf("StatusQueue: got %q", cfg.StatusQueue) + } + if cfg.NexaraTimeout != 10*time.Minute { + t.Fatalf("NexaraTimeout: got %v", cfg.NexaraTimeout) + } + if cfg.PromptsSection != 1 { + t.Fatalf("PromptsSection: got %d", cfg.PromptsSection) + } +} + +func TestLoadFromEnv(t *testing.T) { + t.Setenv("INPUT_QUEUE", "custom.tasks") + t.Setenv("STATUS_QUEUE", "status.events") + t.Setenv("PREFETCH", "4") + t.Setenv("NEXARA_TIMEOUT", "2m") + t.Setenv("PROMPTS_SECTION", "2") + + cfg := Load() + if cfg.InputQueue != "custom.tasks" { + t.Fatalf("InputQueue: got %q", cfg.InputQueue) + } + if cfg.StatusQueue != "status.events" { + t.Fatalf("StatusQueue: got %q", cfg.StatusQueue) + } + if cfg.Prefetch != 4 { + t.Fatalf("Prefetch: got %d", cfg.Prefetch) + } + if cfg.NexaraTimeout != 2*time.Minute { + t.Fatalf("NexaraTimeout: got %v", cfg.NexaraTimeout) + } + if cfg.PromptsSection != 2 { + t.Fatalf("PromptsSection: got %d", cfg.PromptsSection) + } +} diff --git a/workers/transcribe/internal/consumer/consumer.go b/workers/transcribe/internal/consumer/consumer.go index 09366f1..b8390c3 100644 --- a/workers/transcribe/internal/consumer/consumer.go +++ b/workers/transcribe/internal/consumer/consumer.go @@ -12,6 +12,7 @@ import ( "github.com/postmet/transcribe/internal/config" "github.com/postmet/transcribe/internal/models" "github.com/postmet/transcribe/internal/nexara" + "github.com/postmet/transcribe/internal/pipelinestatus" "github.com/postmet/transcribe/internal/prompts" ) @@ -70,6 +71,9 @@ func setupTopology(ch *amqp.Channel, cfg config.Config) error { return fmt.Errorf("bind queue %s: %w", q, err) } } + if err := pipelinestatus.DeclareQueue(ch, cfg.StatusQueue); err != nil { + return fmt.Errorf("declare status queue: %w", err) + } return ch.Qos(cfg.Prefetch, 0, false) } @@ -84,7 +88,11 @@ func (c *Consumer) Run(ctx context.Context) error { return err } - slog.Info("transcribe worker started", "queue", c.cfg.InputQueue, "output_exchange", c.cfg.OutputExchange) + slog.Info("transcribe worker started", + "queue", c.cfg.InputQueue, + "output_exchange", c.cfg.OutputExchange, + "status_queue", c.cfg.StatusQueue, + ) for { select { @@ -112,9 +120,12 @@ func (c *Consumer) handle(ctx context.Context, d amqp.Delivery) { txCtx, cancel := context.WithTimeout(ctx, c.cfg.NexaraTimeout+30*time.Second) defer cancel() + c.publishStatus(txCtx, task.TaskID, task.Filename, pipelinestatus.StatusInProgress, pipelinestatus.StageTranscribing, "") + text, lang, segments, err := c.nexara.TranscribeFile(txCtx, task.FilePath) if err != nil { slog.Warn("transcription failed", "task_id", task.TaskID, "error", err) + c.publishStatus(txCtx, task.TaskID, task.Filename, pipelinestatus.StatusError, pipelinestatus.StageTranscribing, err.Error()) _ = d.Nack(false, false) return } @@ -122,6 +133,7 @@ func (c *Consumer) handle(ctx context.Context, d amqp.Delivery) { promptList, err := c.prompts.Load(txCtx) if err != nil { slog.Warn("prompts load failed", "task_id", task.TaskID, "error", err) + c.publishStatus(txCtx, task.TaskID, task.Filename, pipelinestatus.StatusError, pipelinestatus.StageTranscribing, err.Error()) _ = d.Nack(false, false) return } @@ -170,3 +182,18 @@ func (c *Consumer) handle(ctx context.Context, d amqp.Delivery) { slog.Info("transcribed", "task_id", task.TaskID, "language", lang, "chars", len(text), "segments", len(segments), "prompts", len(promptList)) _ = d.Ack(false) } + +func (c *Consumer) publishStatus(ctx context.Context, taskID, filename, status, stage, errMsg string) { + ev := pipelinestatus.Event{ + TaskID: taskID, + Filename: filename, + Status: status, + Stage: stage, + } + if errMsg != "" { + ev.Error = errMsg + } + if err := pipelinestatus.Publish(ctx, c.ch, c.cfg.StatusQueue, ev); err != nil { + slog.Warn("status publish failed", "task_id", taskID, "stage", stage, "error", err) + } +} diff --git a/workers/transcribe/internal/models/models_test.go b/workers/transcribe/internal/models/models_test.go new file mode 100644 index 0000000..2bb1770 --- /dev/null +++ b/workers/transcribe/internal/models/models_test.go @@ -0,0 +1,58 @@ +package models + +import ( + "encoding/json" + "testing" +) + +func TestAudioTaskRoundTrip(t *testing.T) { + src := AudioTask{ + TaskID: "01HX", + FilePath: "/data/processing/01HX.wav", + Filename: "call.wav", + Size: 2048, + CreatedAt: 1717843200, + } + body, err := json.Marshal(src) + if err != nil { + t.Fatal(err) + } + var got AudioTask + if err := json.Unmarshal(body, &got); err != nil { + t.Fatal(err) + } + if got != src { + t.Fatalf("round-trip mismatch: %+v vs %+v", got, src) + } +} + +func TestTranscriptionResultRoundTrip(t *testing.T) { + src := TranscriptionResult{ + TaskID: "01HX", + Filename: "call.wav", + FilePath: "/data/processing/01HX.wav", + Transcription: "текст звонка", + Language: "ru", + Segments: []Segment{ + {Start: 0, End: 1.2, Text: "текст"}, + }, + Prompts: []Prompt{ + {ID: 1, IDSection: 1, Name: "behavioral", Prompt: "analyze", DtCreate: "2026-01-01"}, + }, + TranscribedAt: 1717843200, + } + body, err := json.Marshal(src) + if err != nil { + t.Fatal(err) + } + var got TranscriptionResult + if err := json.Unmarshal(body, &got); err != nil { + t.Fatal(err) + } + if got.TaskID != src.TaskID || got.Transcription != src.Transcription { + t.Fatalf("mismatch: %+v", got) + } + if len(got.Segments) != 1 || len(got.Prompts) != 1 { + t.Fatalf("nested fields lost: %+v", got) + } +} diff --git a/workers/transcribe/internal/nexara/nexara_test.go b/workers/transcribe/internal/nexara/nexara_test.go new file mode 100644 index 0000000..f83cdb2 --- /dev/null +++ b/workers/transcribe/internal/nexara/nexara_test.go @@ -0,0 +1,118 @@ +package nexara + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestTranscribeFileSuccess(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + t.Fatalf("method: got %s", r.Method) + } + if !strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data") { + t.Fatalf("content-type: got %s", r.Header.Get("Content-Type")) + } + if auth := r.Header.Get("Authorization"); auth != "Bearer nexara-key" { + t.Fatalf("auth: got %q", auth) + } + if err := r.ParseMultipartForm(1 << 20); err != nil { + t.Fatal(err) + } + if r.FormValue("model") != "whisper-1" { + t.Fatalf("model: got %q", r.FormValue("model")) + } + if r.FormValue("response_format") != "json" { + t.Fatalf("response_format: got %q", r.FormValue("response_format")) + } + file, _, err := r.FormFile("file") + if err != nil { + t.Fatal(err) + } + defer file.Close() + buf := make([]byte, 64) + n, _ := file.Read(buf) + if string(buf[:n]) != "audio-bytes" { + t.Fatalf("file content: got %q", string(buf[:n])) + } + + _ = json.NewEncoder(w).Encode(map[string]any{ + "text": "привет мир", + "language": "ru", + "segments": []map[string]any{ + {"start": 0.0, "end": 1.5, "text": "привет"}, + {"start": 1.5, "end": 3.0, "text": "мир"}, + }, + }) + })) + defer srv.Close() + + dir := t.TempDir() + audioPath := filepath.Join(dir, "test.wav") + if err := os.WriteFile(audioPath, []byte("audio-bytes"), 0o644); err != nil { + t.Fatal(err) + } + + client := New(srv.URL, "nexara-key", "whisper-1", 5*time.Second) + text, lang, segments, err := client.TranscribeFile(context.Background(), audioPath) + if err != nil { + t.Fatal(err) + } + if text != "привет мир" { + t.Fatalf("text: got %q", text) + } + if lang != "ru" { + t.Fatalf("language: got %q", lang) + } + if len(segments) != 2 { + t.Fatalf("segments: got %d", len(segments)) + } + if segments[0].Text != "привет" || segments[1].End != 3.0 { + t.Fatalf("unexpected segments: %+v", segments) + } +} + +func TestTranscribeFileAPIError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte("invalid key")) + })) + defer srv.Close() + + dir := t.TempDir() + audioPath := filepath.Join(dir, "test.wav") + if err := os.WriteFile(audioPath, []byte("x"), 0o644); err != nil { + t.Fatal(err) + } + + client := New(srv.URL, "bad", "", 5*time.Second) + _, _, _, err := client.TranscribeFile(context.Background(), audioPath) + if err == nil { + t.Fatal("expected error on 401") + } + if !strings.Contains(err.Error(), "401") { + t.Fatalf("error should mention status: %v", err) + } +} + +func TestTranscribeFileMissingFile(t *testing.T) { + client := New("http://localhost", "key", "", time.Second) + _, _, _, err := client.TranscribeFile(context.Background(), filepath.Join(t.TempDir(), "missing.wav")) + if err == nil { + t.Fatal("expected error for missing file") + } +} + +func TestNewTrimsBaseURL(t *testing.T) { + c := New("https://api.example.com/", "k", "m", time.Second) + if !strings.HasSuffix(c.apiURL, "/api/v1/audio/transcriptions") { + t.Fatalf("apiURL: got %s", c.apiURL) + } +} diff --git a/workers/transcribe/internal/pipelinestatus/status.go b/workers/transcribe/internal/pipelinestatus/status.go new file mode 100644 index 0000000..b4b7b79 --- /dev/null +++ b/workers/transcribe/internal/pipelinestatus/status.go @@ -0,0 +1,57 @@ +package pipelinestatus + +import ( + "context" + "encoding/json" + "fmt" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +const ( + StatusPending = "pending" + StatusInProgress = "in_progress" + StatusDone = "done" + StatusError = "error" +) + +const ( + StageQueued = "queued" + StageTranscribing = "transcribing" + StageAnalysing = "analysing" + StageTagging = "tagging" + StageCompleted = "completed" +) + +type Event struct { + TaskID string `json:"task_id"` + Filename string `json:"filename,omitempty"` + Status string `json:"status"` + Stage string `json:"stage"` + Error string `json:"error,omitempty"` + Timestamp int64 `json:"timestamp"` +} + +func DeclareQueue(ch *amqp.Channel, queue string) error { + _, err := ch.QueueDeclare(queue, true, false, false, false, nil) + return err +} + +func Publish(ctx context.Context, ch *amqp.Channel, queue string, ev Event) error { + if ev.Timestamp == 0 { + ev.Timestamp = time.Now().Unix() + } + body, err := json.Marshal(ev) + if err != nil { + return err + } + if err := ch.PublishWithContext(ctx, "", queue, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + return fmt.Errorf("publish status: %w", err) + } + return nil +} diff --git a/workers/transcribe/internal/prompts/prompts_test.go b/workers/transcribe/internal/prompts/prompts_test.go new file mode 100644 index 0000000..c0edbc7 --- /dev/null +++ b/workers/transcribe/internal/prompts/prompts_test.go @@ -0,0 +1,116 @@ +package prompts + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" +) + +func samplePrompts() []byte { + data, err := json.Marshal([]map[string]any{ + {"id": 1, "id_section": 1, "name": "behavioral", "prompt": "p1", "dt_create": "2026-01-01"}, + {"id": 2, "id_section": 1, "name": "client_data", "prompt": "p2", "dt_create": "2026-01-01"}, + {"id": 3, "id_section": 2, "name": "other", "prompt": "p3", "dt_create": "2026-01-01"}, + }) + if err != nil { + panic(err) + } + return data +} + +func TestLoadStatic(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "prompts.json") + if err := os.WriteFile(path, samplePrompts(), 0o644); err != nil { + t.Fatal(err) + } + + l := New("static", path, "", "", 1) + got, err := l.Load(context.Background()) + if err != nil { + t.Fatal(err) + } + if len(got) != 2 { + t.Fatalf("want 2 prompts for section 1, got %d", len(got)) + } + if got[0].Name != "behavioral" || got[1].Name != "client_data" { + t.Fatalf("unexpected names: %v, %v", got[0].Name, got[1].Name) + } +} + +func TestLoadStaticAllSections(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "prompts.json") + if err := os.WriteFile(path, samplePrompts(), 0o644); err != nil { + t.Fatal(err) + } + + l := New("static", path, "", "", 0) + got, err := l.Load(context.Background()) + if err != nil { + t.Fatal(err) + } + if len(got) != 3 { + t.Fatalf("want 3 prompts without filter, got %d", len(got)) + } +} + +func TestLoadStaticMissingFile(t *testing.T) { + l := New("static", filepath.Join(t.TempDir(), "missing.json"), "", "", 1) + _, err := l.Load(context.Background()) + if err == nil { + t.Fatal("expected error for missing file") + } +} + +func TestLoadHTTP(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/metrics/" { + t.Fatalf("path: got %s", r.URL.Path) + } + if r.URL.Query().Get("id_section") != "1" { + t.Fatalf("id_section: got %s", r.URL.Query().Get("id_section")) + } + if auth := r.Header.Get("Authorization"); auth != "Bearer test-key" { + t.Fatalf("auth: got %q", auth) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(samplePrompts()) + })) + defer srv.Close() + + l := New("http", "", srv.URL, "test-key", 1) + got, err := l.Load(context.Background()) + if err != nil { + t.Fatal(err) + } + if len(got) != 2 { + t.Fatalf("want 2 prompts, got %d", len(got)) + } +} + +func TestLoadHTTPErrorStatus(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("boom")) + })) + defer srv.Close() + + l := New("http", "", srv.URL, "", 1) + _, err := l.Load(context.Background()) + if err == nil { + t.Fatal("expected error on 500") + } +} + +func TestLoadHTTPMissingBaseURL(t *testing.T) { + l := New("http", "", "", "", 1) + _, err := l.Load(context.Background()) + if err == nil { + t.Fatal("expected error without base URL") + } +}