2026-06-24 18:58:35 +03:00
2026-06-24 18:58:35 +03:00
2026-06-24 18:58:35 +03:00
2026-06-24 18:58:35 +03:00
2026-06-24 18:58:35 +03:00
2026-06-24 18:58:35 +03:00
2026-06-10 17:12:58 +03:00
2026-06-24 18:58:35 +03:00
2026-06-24 18:58:35 +03:00

Audio Pipeline

Пайплайн обработки аудиозаписей звонков: от появления файла на диске до классификации, анализа по промптам и публикации итогового результата в очередь final.

Система построена на файловом триггере + RabbitMQ + PostgreSQL + внешних API (Nexara STT, Yandex LLM).


Содержание

  1. Общая схема
  2. Структура проекта
  3. Инфраструктура
  4. Этапы обработки
  5. Файловое хранилище
  6. RabbitMQ
  7. PostgreSQL
  8. Воркеры
  9. Форматы сообщений
  10. Промпты для анализа
  11. Агрегация и очередь final
  12. Конфигурация (.env)
  13. Запуск и управление
  14. Логирование
  15. Обработка ошибок
  16. Переключение промптов на API
  17. Что не реализовано

Общая схема

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│  incoming/  │────▶│   watcher    │────▶│  RabbitMQ   │
│  (файлы)    │     │  (сканер)    │     │ audio.new   │
└─────────────┘     └──────────────┘     └──────┬──────┘
                                                 │
                    ┌──────────────┐             ▼
                    │ processing/  │     ┌─────────────┐
                    │  (аудио)     │◀────│ transcribe  │── Nexara API (STT)
                    └──────────────┘     └──────┬──────┘
                                                │
                                    fanout transcription_done
                                    ┌───────────┴───────────┐
                                    ▼                       ▼
                             ┌─────────────┐         ┌─────────────┐
                             │   analyse   │         │   tagging   │
                             │ Yandex LLM  │         │ Yandex LLM  │
                             └──────┬──────┘         └──────┬──────┘
                                    │                       │
                                    └───────────┬───────────┘
                                                ▼
                                         ┌─────────────┐
                                         │  PostgreSQL │
                                         │   results   │
                                         └──────┬──────┘
                                                │
                              оба готовы ───────┤
                                                ▼
                                         ┌─────────────┐
                                         │   final     │  (RabbitMQ)
                                         │  + удаление │
                                         │  файла      │
                                         └─────────────┘

Ключевые принципы:

  • Каждый этап — отдельный Docker-сервис (воркер).
  • Связь между этапами — через RabbitMQ (асинхронно).
  • Промежуточные и итоговые результаты LLM — в PostgreSQL.
  • analyse и tagging работают параллельно после транскрипции.
  • В final публикует тот воркер, который завершился последним.

Структура проекта

audio-pipeline/
├── docker-compose.yml          # инфраструктура и все сервисы
├── .env                        # конфигурация (не в git)
├── db/
│   └── init.sql                # схема PostgreSQL
├── storage/                    # аудиофайлы на хосте (монтируется в контейнеры)
│   ├── incoming/               # сюда кладут новые файлы
│   ├── processing/             # файлы в обработке
│   └── failed/                 # файлы при критических ошибках watcher
├── watcher/                    # сканер файловой системы
│   ├── cmd/watcher/main.go
│   └── internal/
│       ├── config/
│       ├── scanner/
│       └── publisher/
└── workers/
    ├── transcribe/             # STT + загрузка промптов + fanout
    │   ├── cmd/transcribe/main.go
    │   ├── configs/prompts.json
    │   └── internal/
    │       ├── consumer/
    │       ├── nexara/
    │       ├── prompts/
    │       └── models/
    ├── analyse/                # анализ по промптам (Yandex LLM)
    │   └── cmd/analyse/main.go
    └── tagging/                # классификация диалога (Yandex LLM)
        └── cmd/tagging/main.go

Инфраструктура

Сервис Контейнер Порты Назначение
rabbit rabbit 5672, 15672 RabbitMQ + Management UI
postgres postgres 5432 Хранение результатов
watcher watcher Мониторинг incoming/
transcribe transcribe Транскрипция + fanout
analyse analyse Анализ по промптам
tagging tagging Классификация диалога

RabbitMQ UI: http://localhost:15672 (логин/пароль из .env)


Этапы обработки

1. Watcher — обнаружение файла

Триггер: появление аудиофайла в {STORAGE_ROOT}/incoming/.

Алгоритм:

  1. Каждые POLL_INTERVAL (по умолчанию 5 с) сканирует incoming/.
  2. Пропускает скрытые файлы (.) и временные (.tmp).
  3. Проверяет расширение: .mp3, .wav, .m4a, .ogg, .flac, .webm.
  4. Ждёт стабилизации размера файла (STABLE_WINDOW / STABLE_CHECKS) — защита от незавершённой загрузки.
  5. Генерирует ULID task_id.
  6. Атомарно переименовывает: incoming/name.wavprocessing/{task_id}.wav.
  7. Публикует задачу в RabbitMQ (audio_pipeline / audio.new).

При ошибке публикации: файл возвращается в incoming/. Если откат невозможен — перемещается в failed/.

2. Transcribe — транскрипция

Вход: очередь transcribe.tasks.

Алгоритм:

  1. Читает аудиофайл по file_path из сообщения.
  2. Отправляет в Nexara API (Speech-to-Text).
  3. Загружает промпты (prompts.json или HTTP API).
  4. Формирует TranscriptionResult и публикует в fanout-exchange transcription_done.
  5. Сообщение доставляется одновременно в очереди analyse и tagging.

3. Tagging — классификация диалога

Вход: очередь tagging.

Алгоритм:

  1. Получает транскрипцию из сообщения.
  2. Отправляет один запрос в Yandex LLM с промптом классификации (L1/L2/L3, risk_level и т.д.).
  3. Сохраняет результат в results.tagging (PostgreSQL).
  4. Если results.analysis уже заполнен — публикует в final и удаляет файл.

4. Analyse — анализ по промптам

Вход: очередь analyse.

Алгоритм:

  1. Получает транскрипцию и массив prompts из сообщения.
  2. Для каждого промпта — отдельный запрос в Yandex LLM (сейчас 3 промпта: behavioral, client_data, cargo_data).
  3. Сохраняет результат в results.analysis, метаданные — в results.metadata.
  4. Если results.tagging уже заполнен — публикует в final и удаляет файл.

5. Final — итоговое сообщение

Выход: очередь final (default exchange, routing key = final).

Публикуется полный JSON из таблицы results: транскрипция, analysis, tagging, metadata, статус, timestamps.

После успешной публикации аудиофайл удаляется из processing/.

Consumer для очереди final пока не реализован — сообщения накапливаются в очереди до подключения обработчика.


Файловое хранилище

Путь на хосте задаётся STORAGE_ROOT (по умолчанию ./storage).

Директория Назначение
incoming/ Новые файлы для обработки
processing/ Файлы в работе (после claim watcher)
failed/ Файлы при невосстановимых ошибках watcher

Жизненный цикл файла:

incoming/recording.wav
    → processing/01KTN....wav   (watcher)
    → остаётся до завершения    (transcribe читает по пути)
    → удаляется                 (после публикации в final)

Внутри контейнеров путь: /data/storage/... (volume mount).


RabbitMQ

Топология

exchange: audio_pipeline (direct)
    └── queue: transcribe.tasks  ←  routing key: audio.new

exchange: transcription_done (fanout)
    ├── queue: analyse
    └── queue: tagging

queue: final  (default exchange, без binding)

queue: pipeline.status  (default exchange — события стадий для фронта)

Очереди

Очередь Producer Consumer Описание
transcribe.tasks watcher transcribe Новые аудиозадачи
analyse transcribe analyse Результат транскрипции
tagging transcribe tagging Результат транскрипции
final analyse/tagging Итоговый результат
pipeline.status все воркеры backend Стадии обработки (см. ниже)

Очередь pipeline.status (стадии)

Каждый воркер публикует JSON-события в очередь pipeline.status (переменная STATUS_QUEUE).

Формат сообщения:

{
  "task_id": "01KTN...",
  "filename": "recording.wav",
  "status": "pending",
  "stage": "queued",
  "error": "",
  "timestamp": 1717843200
}

Значения status:

status Описание
pending ожидает
in_progress в процессе
done готово
error ошибка

Значения stage:

stage Кто публикует Когда
queued watcher Задача отправлена в RabbitMQ
transcribing transcribe STT начат / ошибка Nexara
analysing analyse LLM-анализ начат / ошибка
tagging tagging Классификация начата / ошибка
completed analyse/tagging Оба воркера завершились

Типичная последовательность:

pending/queued
  → in_progress/transcribing
  → in_progress/analysing  (параллельно)
  → in_progress/tagging    (параллельно)
  → done/completed

Браузер не может подписаться на RabbitMQ напрямую. Нужен backend-consumer, который читает pipeline.status и отдаёт события через REST или WebSocket.

Dead Letter

Очередь transcribe.tasks настроена с DLX (dlx). Сообщения с Nack(requeue=false) уходят в dead-letter. Отдельная DLQ-очередь может потребовать дополнительной настройки.


PostgreSQL

Таблица results

CREATE TABLE results (
    task_id        TEXT PRIMARY KEY,
    filename       TEXT,
    transcription  TEXT,
    analysis       JSONB,      -- результат analyse (Yandex LLM)
    tagging        JSONB,      -- результат tagging (Yandex LLM)
    metadata       JSONB,      -- file_path, segments, prompts, language, transcribed_at
    status         TEXT NOT NULL DEFAULT 'pending',  -- pending | in_progress | done | error
    created_at     TIMESTAMPTZ,
    updated_at     TIMESTAMPTZ
);

Кто что пишет

Поле Пишет Когда
tagging tagging После классификации
analysis analyse После анализа
filename оба При сохранении своей части
transcription analyse При сохранении analysis
metadata analyse file_path, segments, prompts…
status оба in_progress при старте; done когда оба поля заполнены; error при сбое

Просмотр данных

docker exec -it postgres psql -U pipeline -d pipeline
SELECT task_id, filename, status, updated_at FROM results ORDER BY updated_at DESC;

SELECT task_id, jsonb_pretty(analysis), jsonb_pretty(tagging)
FROM results WHERE task_id = 'ВАШ_TASK_ID';

Воркеры

Watcher

  • Язык: Go
  • Зависимости: RabbitMQ
  • Volume: ${STORAGE_ROOT}:/data/storage
  • Не использует: Postgres, Yandex, Nexara

Transcribe

  • Язык: Go
  • API: Nexara (STT)
  • Промпты: static file или HTTP
  • Volume: storage + configs/prompts.json

Tagging

  • Язык: Go
  • API: Yandex Cloud LLM (YANDEX_API_KEY, YANDEX_MODEL)
  • При старте: тестовый запрос к Yandex API (проверка подключения)
  • Volume: storage (для удаления файлов) + .env (hot-reload токена)

Analyse

  • Язык: Go
  • API: Yandex Cloud LLM
  • Вызовов LLM на задачу: по числу промптов (сейчас 3)
  • Без повторов: один вызов на промпт, при ошибке — сообщение отбрасывается
  • Volume: storage + .env

Форматы сообщений

1. Watcher → Transcribe

Exchange: audio_pipeline (direct)
Routing key: audio.new
Queue: transcribe.tasks

{
  "task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W",
  "file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav",
  "filename": "1.wav",
  "size": 1234567,
  "created_at": 1780914907
}

2. Transcribe → Analyse + Tagging

Exchange: transcription_done (fanout)
Queues: analyse, tagging (одинаковое тело)

{
  "task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W",
  "filename": "1.wav",
  "file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav",
  "transcription": "полный текст транскрипции...",
  "language": "ru",
  "segments": [
    {"start": 0.0, "end": 27.96, "text": "..."}
  ],
  "prompts": [
    {
      "id": 1,
      "id_section": 1,
      "name": "behavioral",
      "prompt": "Ты — строгий классификатор звонков...",
      "dt_create": "2026-06-09T09:00:00"
    }
  ],
  "transcribed_at": 1780914907
}

3. Final — итоговое сообщение

Queue: final

{
  "task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W",
  "filename": "1.wav",
  "transcription": "полный текст...",
  "analysis": {
    "behavioral": {
      "greeting": {"value": true, "evidence": "Здравствуйте", "confidence": 0.95},
      "initiative": {"value": true, "evidence": "...", "confidence": 0.8},
      "questions_check": {"value": false, "evidence": null, "confidence": 0.0},
      "closing": {"value": true, "evidence": "всего доброго", "confidence": 0.9}
    },
    "client_data": { "...": "..." },
    "cargo_data": { "...": "..." }
  },
  "tagging": {
    "L1": "tracking",
    "L2": "location_request",
    "L3": "delay",
    "risk_level": "medium",
    "has_action_items": true,
    "has_deadline": false
  },
  "file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav",
  "language": "ru",
  "segments": [...],
  "prompts": [...],
  "transcribed_at": 1780914907,
  "status": "done",
  "created_at": "2026-06-09T09:00:00Z",
  "updated_at": "2026-06-09T09:05:00Z"
}

Промпты для анализа

Источник

Файл: workers/transcribe/configs/prompts.json
Или HTTP API (см. переключение на API).

Transcribe загружает промпты и вкладывает их в сообщение для analyse. Analyse не знает, откуда они пришли.

Текущие промпты (3 штуки)

name Назначение
behavioral Приветствие, инициативность, вопросы, прощание
client_data Первый раз, город, тип клиента, контакты, источник
cargo_data Характер груза, параметры, стоимость

Каждый промпт — полный текст инструкции с форматом JSON-ответа. Analyse отправляет:

<текст промпта из конфига>

=== ТРАНСКРИПЦИЯ ===
"""
<текст звонка>
"""

Ответ LLM сохраняется целиком под ключом name промпта в analysis.

Tagging — отдельный промпт

Tagging не использует prompts.json. У него встроенный промпт классификации логистических диалогов (L1/L2/L3, risk_level, has_action_items, has_deadline).


Агрегация и очередь final

Оба воркера (analyse, tagging) пишут в одну строку results по task_id.

Атомарная проверка готовности (SQL):

UPDATE results SET ...
RETURNING (analysis IS NOT NULL AND tagging IS NOT NULL)
  • Если RETURNING = false — воркер ждёт второго.
  • Если RETURNING = true — этот воркер:
    1. Читает полную строку из БД
    2. Публикует JSON в очередь final
    3. Удаляет файл из processing/ (только пути с /processing/)

Конфигурация (.env)

Пример ключевых переменных:

# Storage
STORAGE_ROOT=./storage

# Watcher
POLL_INTERVAL=5s
STABLE_WINDOW=2s
STABLE_CHECKS=3

# RabbitMQ
RABBITMQ_URL=amqp://admin:secret123@rabbit:5672/
RABBITMQ_EXCHANGE=audio_pipeline
RABBITMQ_ROUTING_KEY=audio.new

# Transcribe
INPUT_QUEUE=transcribe.tasks
OUTPUT_EXCHANGE=transcription_done
ANALYSE_QUEUE=analyse
TAGGING_QUEUE=tagging
FINAL_QUEUE=final
STATUS_QUEUE=pipeline.status
PREFETCH=1

# Nexara (STT)
NEXARA_BASE_URL=https://api.nexara.ru
NEXARA_API_KEY=...
NEXARA_MODEL=whisper-1
NEXARA_TIMEOUT=10m

# Prompts
PROMPTS_SOURCE=static
PROMPTS_FILE=/app/configs/prompts.json
PROMPTS_SECTION=1

# Postgres
POSTGRES_USER=pipeline
POSTGRES_PASSWORD=pipeline_secret
POSTGRES_DB=pipeline
DATABASE_URL=postgres://pipeline:pipeline_secret@postgres:5432/pipeline?sslmode=disable

# Yandex LLM (tagging + analyse)
YANDEX_API_KEY=t1....
YANDEX_MODEL=gpt://folder_id/model_name
YANDEX_API_URL=https://ai.api.cloud.yandex.net/v1/chat/completions

Hot-reload токена Yandex

Воркеры tagging и analyse монтируют .env как /config/.env и перечитывают при каждом старте контейнера:

docker compose restart tagging analyse

docker compose restart не пересоздаёт контейнер, но перезапускает процесс, который читает свежий .env.


Запуск и управление

Первый запуск

cd audio-pipeline
docker compose up -d --build

Полный сброс (очереди + БД)

docker compose down -v
docker compose up -d --build

Пересборка отдельных воркеров

docker compose up -d --build transcribe analyse tagging

Обработка нового файла

cp recording.wav storage/incoming/

Проверка статуса

docker compose ps
docker compose logs -f watcher transcribe analyse tagging

RabbitMQ — просмотр очереди final

UI: http://localhost:15672 → Queues → final

Развёртывание в k3s

Полная инструкция: k8s/README.md

Кратко:

# 1. секреты из .env
./k8s/prepare-secret.sh

# 2. каталог хранилища на ноде k3s
sudo mkdir -p /var/lib/audio-pipeline/storage/{incoming,processing,failed}

# 3. сборка образов и импорт в k3s
./k8s/build-images.sh

# 4. деплой
kubectl apply -k k8s/

# 5. проверка
kubectl -n audio-pipeline get pods

Загрузка файла на ноду:

sudo cp recording.wav /var/lib/audio-pipeline/storage/incoming/

Логирование

Все воркеры пишут структурированные JSON-логи в stdout.

Ключевые события

Событие Воркер Описание
claimed file watcher Файл взят в обработку
transcribed transcribe STT завершён
llm call ok analyse/tagging Вызов Yandex API
task complete analyse/tagging Оба результата готовы
published final analyse/tagging Сообщение в final
processing file deleted analyse/tagging Файл удалён из processing
yandex api check ok analyse/tagging Проверка API при старте

Поиск в логах

# все LLM-вызовы
docker compose logs analyse 2>&1 | grep '"llm call'

# завершённые задачи
docker compose logs 2>&1 | grep '"task complete"'

# ошибки
docker compose logs 2>&1 | grep '"level":"WARN"'

Обработка ошибок

Ситуация Поведение
Битое JSON в очереди Nack(requeue=false) — в DLQ
Ошибка Nexara STT Nack(requeue=false) — в DLQ
Ошибка Yandex LLM Nack(requeue=false) — сообщение отбрасывается
Ошибка сохранения в Postgres Nack(requeue=false) — отбрасывается
Redelivered сообщение Пропускается (без повторного вызова LLM)
Ошибка публикации в final Файл не удаляется
Yandex API недоступен при старте Воркер не запускается, контейнер рестартит

Политика без повторов: каждый промпт / классификация — ровно один вызов LLM. Повторные доставки RabbitMQ игнорируются.


Переключение промптов на API

Уже реализовано в transcribe. Достаточно изменить .env:

PROMPTS_SOURCE=http
PROMPTS_BASE_URL=https://your-api.example.com
PROMPTS_API_KEY=your_token
PROMPTS_SECTION=1

Запрос: GET {PROMPTS_BASE_URL}/metrics/?id_section=1

Ожидаемый ответ — массив в том же формате, что prompts.json:

[
  {
    "id": 1,
    "id_section": 1,
    "name": "behavioral",
    "prompt": "полный текст промпта...",
    "dt_create": "2026-06-09T09:00:00"
  }
]

Analyse менять не нужно — промпты приходят в сообщении RabbitMQ.


Что не реализовано

  • Consumer очереди final — нет воркера, который читает итоговые сообщения
  • DLQ-очередь transcribe.tasks.failed — exchange dlx объявлен, но отдельная очередь может не быть привязана
  • Повторная обработка при ошибках LLM — намеренно отключена
  • Архивация удалённых файлов — файлы удаляются без бэкапа

Быстрый troubleshooting

Проблема Решение
Файл не обрабатывается Проверить incoming/, логи watcher
TLS timeout к Yandex из контейнера MTU Docker, VPN, docker compose restart
Старый Yandex токен Обновить .env, docker compose restart tagging analyse
context canceled в final Исправлено — пересобрать analyse/tagging
metadata column does not exist ALTER TABLE results ADD COLUMN IF NOT EXISTS metadata JSONB;
Файл остаётся в processing Проверить, дошли ли оба воркера до published final
Description
No description provided
Readme 124 KiB
Languages
Go 96.9%
Shell 1.5%
Dockerfile 1.2%
Makefile 0.4%