Files
pipeline_backend/README.md
2026-06-24 18:58:35 +03:00

743 lines
29 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Audio Pipeline
Пайплайн обработки аудиозаписей звонков: от появления файла на диске до классификации, анализа по промптам и публикации итогового результата в очередь `final`.
Система построена на **файловом триггере** + **RabbitMQ** + **PostgreSQL** + внешних API (Nexara STT, Yandex LLM).
---
## Содержание
1. [Общая схема](#общая-схема)
2. [Структура проекта](#структура-проекта)
3. [Инфраструктура](#инфраструктура)
4. [Этапы обработки](#этапы-обработки)
5. [Файловое хранилище](#файловое-хранилище)
6. [RabbitMQ](#rabbitmq)
7. [PostgreSQL](#postgresql)
8. [Воркеры](#воркеры)
9. [Форматы сообщений](#форматы-сообщений)
10. [Промпты для анализа](#промпты-для-анализа)
11. [Агрегация и очередь final](#агрегация-и-очередь-final)
12. [Конфигурация (.env)](#конфигурация-env)
13. [Запуск и управление](#запуск-и-управление)
14. [Логирование](#логирование)
15. [Обработка ошибок](#обработка-ошибок)
16. [Переключение промптов на API](#переключение-промптов-на-api)
17. [Что не реализовано](#что-не-реализовано)
---
## Общая схема
```
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ incoming/ │────▶│ watcher │────▶│ RabbitMQ │
│ (файлы) │ │ (сканер) │ │ audio.new │
└─────────────┘ └──────────────┘ └──────┬──────┘
┌──────────────┐ ▼
│ processing/ │ ┌─────────────┐
│ (аудио) │◀────│ transcribe │── Nexara API (STT)
└──────────────┘ └──────┬──────┘
fanout transcription_done
┌───────────┴───────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ analyse │ │ tagging │
│ Yandex LLM │ │ Yandex LLM │
└──────┬──────┘ └──────┬──────┘
│ │
└───────────┬───────────┘
┌─────────────┐
│ PostgreSQL │
│ results │
└──────┬──────┘
оба готовы ───────┤
┌─────────────┐
│ final │ (RabbitMQ)
│ + удаление │
│ файла │
└─────────────┘
```
**Ключевые принципы:**
- Каждый этап — отдельный Docker-сервис (воркер).
- Связь между этапами — через RabbitMQ (асинхронно).
- Промежуточные и итоговые результаты LLM — в PostgreSQL.
- `analyse` и `tagging` работают **параллельно** после транскрипции.
- В `final` публикует тот воркер, который завершился **последним**.
---
## Структура проекта
```
audio-pipeline/
├── docker-compose.yml # инфраструктура и все сервисы
├── .env # конфигурация (не в git)
├── db/
│ └── init.sql # схема PostgreSQL
├── storage/ # аудиофайлы на хосте (монтируется в контейнеры)
│ ├── incoming/ # сюда кладут новые файлы
│ ├── processing/ # файлы в обработке
│ └── failed/ # файлы при критических ошибках watcher
├── watcher/ # сканер файловой системы
│ ├── cmd/watcher/main.go
│ └── internal/
│ ├── config/
│ ├── scanner/
│ └── publisher/
└── workers/
├── transcribe/ # STT + загрузка промптов + fanout
│ ├── cmd/transcribe/main.go
│ ├── configs/prompts.json
│ └── internal/
│ ├── consumer/
│ ├── nexara/
│ ├── prompts/
│ └── models/
├── analyse/ # анализ по промптам (Yandex LLM)
│ └── cmd/analyse/main.go
└── tagging/ # классификация диалога (Yandex LLM)
└── cmd/tagging/main.go
```
---
## Инфраструктура
| Сервис | Контейнер | Порты | Назначение |
|-------------|-------------|----------------|-------------------------------|
| `rabbit` | rabbit | 5672, 15672 | RabbitMQ + Management UI |
| `postgres` | postgres | 5432 | Хранение результатов |
| `watcher` | watcher | — | Мониторинг `incoming/` |
| `transcribe`| transcribe | — | Транскрипция + fanout |
| `analyse` | analyse | — | Анализ по промптам |
| `tagging` | tagging | — | Классификация диалога |
**RabbitMQ UI:** http://localhost:15672 (логин/пароль из `.env`)
---
## Этапы обработки
### 1. Watcher — обнаружение файла
**Триггер:** появление аудиофайла в `{STORAGE_ROOT}/incoming/`.
**Алгоритм:**
1. Каждые `POLL_INTERVAL` (по умолчанию 5 с) сканирует `incoming/`.
2. Пропускает скрытые файлы (`.`) и временные (`.tmp`).
3. Проверяет расширение: `.mp3`, `.wav`, `.m4a`, `.ogg`, `.flac`, `.webm`.
4. Ждёт стабилизации размера файла (`STABLE_WINDOW` / `STABLE_CHECKS`) — защита от незавершённой загрузки.
5. Генерирует ULID `task_id`.
6. Атомарно переименовывает: `incoming/name.wav``processing/{task_id}.wav`.
7. Публикует задачу в RabbitMQ (`audio_pipeline` / `audio.new`).
**При ошибке публикации:** файл возвращается в `incoming/`. Если откат невозможен — перемещается в `failed/`.
### 2. Transcribe — транскрипция
**Вход:** очередь `transcribe.tasks`.
**Алгоритм:**
1. Читает аудиофайл по `file_path` из сообщения.
2. Отправляет в **Nexara API** (Speech-to-Text).
3. Загружает промпты (`prompts.json` или HTTP API).
4. Формирует `TranscriptionResult` и публикует в fanout-exchange `transcription_done`.
5. Сообщение доставляется **одновременно** в очереди `analyse` и `tagging`.
### 3. Tagging — классификация диалога
**Вход:** очередь `tagging`.
**Алгоритм:**
1. Получает транскрипцию из сообщения.
2. Отправляет **один** запрос в **Yandex LLM** с промптом классификации (L1/L2/L3, risk_level и т.д.).
3. Сохраняет результат в `results.tagging` (PostgreSQL).
4. Если `results.analysis` уже заполнен — публикует в `final` и удаляет файл.
### 4. Analyse — анализ по промптам
**Вход:** очередь `analyse`.
**Алгоритм:**
1. Получает транскрипцию и массив `prompts` из сообщения.
2. Для **каждого** промпта — отдельный запрос в **Yandex LLM** (сейчас 3 промпта: behavioral, client_data, cargo_data).
3. Сохраняет результат в `results.analysis`, метаданные — в `results.metadata`.
4. Если `results.tagging` уже заполнен — публикует в `final` и удаляет файл.
### 5. Final — итоговое сообщение
**Выход:** очередь `final` (default exchange, routing key = `final`).
Публикуется **полный JSON** из таблицы `results`: транскрипция, analysis, tagging, metadata, статус, timestamps.
После успешной публикации аудиофайл удаляется из `processing/`.
> **Consumer для очереди `final` пока не реализован** — сообщения накапливаются в очереди до подключения обработчика.
---
## Файловое хранилище
Путь на хосте задаётся `STORAGE_ROOT` (по умолчанию `./storage`).
| Директория | Назначение |
|---------------|-------------------------------------------------|
| `incoming/` | Новые файлы для обработки |
| `processing/` | Файлы в работе (после claim watcher) |
| `failed/` | Файлы при невосстановимых ошибках watcher |
**Жизненный цикл файла:**
```
incoming/recording.wav
→ processing/01KTN....wav (watcher)
→ остаётся до завершения (transcribe читает по пути)
→ удаляется (после публикации в final)
```
Внутри контейнеров путь: `/data/storage/...` (volume mount).
---
## RabbitMQ
### Топология
```
exchange: audio_pipeline (direct)
└── queue: transcribe.tasks ← routing key: audio.new
exchange: transcription_done (fanout)
├── queue: analyse
└── queue: tagging
queue: final (default exchange, без binding)
queue: pipeline.status (default exchange — события стадий для фронта)
```
### Очереди
| Очередь | Producer | Consumer | Описание |
|--------------------|------------|------------|-----------------------------|
| `transcribe.tasks` | watcher | transcribe | Новые аудиозадачи |
| `analyse` | transcribe | analyse | Результат транскрипции |
| `tagging` | transcribe | tagging | Результат транскрипции |
| `final` | analyse/tagging | — | Итоговый результат |
| `pipeline.status` | все воркеры | backend | Стадии обработки (см. ниже) |
### Очередь `pipeline.status` (стадии)
Каждый воркер публикует JSON-события в очередь `pipeline.status` (переменная `STATUS_QUEUE`).
**Формат сообщения:**
```json
{
"task_id": "01KTN...",
"filename": "recording.wav",
"status": "pending",
"stage": "queued",
"error": "",
"timestamp": 1717843200
}
```
**Значения `status`:**
| status | Описание |
|----------------|-----------------|
| `pending` | ожидает |
| `in_progress` | в процессе |
| `done` | готово |
| `error` | ошибка |
**Значения `stage`:**
| stage | Кто публикует | Когда |
|-----------------|---------------|--------------------------------|
| `queued` | watcher | Задача отправлена в RabbitMQ |
| `transcribing` | transcribe | STT начат / ошибка Nexara |
| `analysing` | analyse | LLM-анализ начат / ошибка |
| `tagging` | tagging | Классификация начата / ошибка |
| `completed` | analyse/tagging | Оба воркера завершились |
**Типичная последовательность:**
```
pending/queued
→ in_progress/transcribing
→ in_progress/analysing (параллельно)
→ in_progress/tagging (параллельно)
→ done/completed
```
> Браузер не может подписаться на RabbitMQ напрямую. Нужен backend-consumer, который читает `pipeline.status` и отдаёт события через REST или WebSocket.
### Dead Letter
Очередь `transcribe.tasks` настроена с DLX (`dlx`). Сообщения с `Nack(requeue=false)` уходят в dead-letter. Отдельная DLQ-очередь может потребовать дополнительной настройки.
---
## PostgreSQL
### Таблица `results`
```sql
CREATE TABLE results (
task_id TEXT PRIMARY KEY,
filename TEXT,
transcription TEXT,
analysis JSONB, -- результат analyse (Yandex LLM)
tagging JSONB, -- результат tagging (Yandex LLM)
metadata JSONB, -- file_path, segments, prompts, language, transcribed_at
status TEXT NOT NULL DEFAULT 'pending', -- pending | in_progress | done | error
created_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ
);
```
### Кто что пишет
| Поле | Пишет | Когда |
|-----------------|----------|--------------------------------|
| `tagging` | tagging | После классификации |
| `analysis` | analyse | После анализа |
| `filename` | оба | При сохранении своей части |
| `transcription` | analyse | При сохранении analysis |
| `metadata` | analyse | file_path, segments, prompts… |
| `status` | оба | `in_progress` при старте; `done` когда оба поля заполнены; `error` при сбое |
### Просмотр данных
```bash
docker exec -it postgres psql -U pipeline -d pipeline
```
```sql
SELECT task_id, filename, status, updated_at FROM results ORDER BY updated_at DESC;
SELECT task_id, jsonb_pretty(analysis), jsonb_pretty(tagging)
FROM results WHERE task_id = 'ВАШ_TASK_ID';
```
---
## Воркеры
### Watcher
- **Язык:** Go
- **Зависимости:** RabbitMQ
- **Volume:** `${STORAGE_ROOT}:/data/storage`
- **Не использует:** Postgres, Yandex, Nexara
### Transcribe
- **Язык:** Go
- **API:** Nexara (STT)
- **Промпты:** static file или HTTP
- **Volume:** storage + `configs/prompts.json`
### Tagging
- **Язык:** Go
- **API:** Yandex Cloud LLM (`YANDEX_API_KEY`, `YANDEX_MODEL`)
- **При старте:** тестовый запрос к Yandex API (проверка подключения)
- **Volume:** storage (для удаления файлов) + `.env` (hot-reload токена)
### Analyse
- **Язык:** Go
- **API:** Yandex Cloud LLM
- **Вызовов LLM на задачу:** по числу промптов (сейчас 3)
- **Без повторов:** один вызов на промпт, при ошибке — сообщение отбрасывается
- **Volume:** storage + `.env`
---
## Форматы сообщений
### 1. Watcher → Transcribe
**Exchange:** `audio_pipeline` (direct)
**Routing key:** `audio.new`
**Queue:** `transcribe.tasks`
```json
{
"task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W",
"file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav",
"filename": "1.wav",
"size": 1234567,
"created_at": 1780914907
}
```
### 2. Transcribe → Analyse + Tagging
**Exchange:** `transcription_done` (fanout)
**Queues:** `analyse`, `tagging` (одинаковое тело)
```json
{
"task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W",
"filename": "1.wav",
"file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav",
"transcription": "полный текст транскрипции...",
"language": "ru",
"segments": [
{"start": 0.0, "end": 27.96, "text": "..."}
],
"prompts": [
{
"id": 1,
"id_section": 1,
"name": "behavioral",
"prompt": "Ты — строгий классификатор звонков...",
"dt_create": "2026-06-09T09:00:00"
}
],
"transcribed_at": 1780914907
}
```
### 3. Final — итоговое сообщение
**Queue:** `final`
```json
{
"task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W",
"filename": "1.wav",
"transcription": "полный текст...",
"analysis": {
"behavioral": {
"greeting": {"value": true, "evidence": "Здравствуйте", "confidence": 0.95},
"initiative": {"value": true, "evidence": "...", "confidence": 0.8},
"questions_check": {"value": false, "evidence": null, "confidence": 0.0},
"closing": {"value": true, "evidence": "всего доброго", "confidence": 0.9}
},
"client_data": { "...": "..." },
"cargo_data": { "...": "..." }
},
"tagging": {
"L1": "tracking",
"L2": "location_request",
"L3": "delay",
"risk_level": "medium",
"has_action_items": true,
"has_deadline": false
},
"file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav",
"language": "ru",
"segments": [...],
"prompts": [...],
"transcribed_at": 1780914907,
"status": "done",
"created_at": "2026-06-09T09:00:00Z",
"updated_at": "2026-06-09T09:05:00Z"
}
```
---
## Промпты для анализа
### Источник
Файл: `workers/transcribe/configs/prompts.json`
Или HTTP API (см. [переключение на API](#переключение-промптов-на-api)).
Transcribe загружает промпты и **вкладывает их в сообщение** для analyse. Analyse не знает, откуда они пришли.
### Текущие промпты (3 штуки)
| name | Назначение |
|----------------|---------------------------------------------------------|
| `behavioral` | Приветствие, инициативность, вопросы, прощание |
| `client_data` | Первый раз, город, тип клиента, контакты, источник |
| `cargo_data` | Характер груза, параметры, стоимость |
Каждый промпт — **полный текст инструкции** с форматом JSON-ответа. Analyse отправляет:
```
<текст промпта из конфига>
=== ТРАНСКРИПЦИЯ ===
"""
<текст звонка>
"""
```
Ответ LLM сохраняется целиком под ключом `name` промпта в `analysis`.
### Tagging — отдельный промпт
Tagging **не использует** `prompts.json`. У него встроенный промпт классификации логистических диалогов (L1/L2/L3, risk_level, has_action_items, has_deadline).
---
## Агрегация и очередь final
Оба воркера (`analyse`, `tagging`) пишут в одну строку `results` по `task_id`.
**Атомарная проверка готовности** (SQL):
```sql
UPDATE results SET ...
RETURNING (analysis IS NOT NULL AND tagging IS NOT NULL)
```
- Если `RETURNING = false` — воркер ждёт второго.
- Если `RETURNING = true` — этот воркер:
1. Читает полную строку из БД
2. Публикует JSON в очередь `final`
3. Удаляет файл из `processing/` (только пути с `/processing/`)
---
## Конфигурация (.env)
Пример ключевых переменных:
```env
# Storage
STORAGE_ROOT=./storage
# Watcher
POLL_INTERVAL=5s
STABLE_WINDOW=2s
STABLE_CHECKS=3
# RabbitMQ
RABBITMQ_URL=amqp://admin:secret123@rabbit:5672/
RABBITMQ_EXCHANGE=audio_pipeline
RABBITMQ_ROUTING_KEY=audio.new
# Transcribe
INPUT_QUEUE=transcribe.tasks
OUTPUT_EXCHANGE=transcription_done
ANALYSE_QUEUE=analyse
TAGGING_QUEUE=tagging
FINAL_QUEUE=final
STATUS_QUEUE=pipeline.status
PREFETCH=1
# Nexara (STT)
NEXARA_BASE_URL=https://api.nexara.ru
NEXARA_API_KEY=...
NEXARA_MODEL=whisper-1
NEXARA_TIMEOUT=10m
# Prompts
PROMPTS_SOURCE=static
PROMPTS_FILE=/app/configs/prompts.json
PROMPTS_SECTION=1
# Postgres
POSTGRES_USER=pipeline
POSTGRES_PASSWORD=pipeline_secret
POSTGRES_DB=pipeline
DATABASE_URL=postgres://pipeline:pipeline_secret@postgres:5432/pipeline?sslmode=disable
# Yandex LLM (tagging + analyse)
YANDEX_API_KEY=t1....
YANDEX_MODEL=gpt://folder_id/model_name
YANDEX_API_URL=https://ai.api.cloud.yandex.net/v1/chat/completions
```
### Hot-reload токена Yandex
Воркеры `tagging` и `analyse` монтируют `.env` как `/config/.env` и перечитывают при **каждом старте** контейнера:
```bash
docker compose restart tagging analyse
```
> `docker compose restart` не пересоздаёт контейнер, но перезапускает процесс, который читает свежий `.env`.
---
## Запуск и управление
### Первый запуск
```bash
cd audio-pipeline
docker compose up -d --build
```
### Полный сброс (очереди + БД)
```bash
docker compose down -v
docker compose up -d --build
```
### Пересборка отдельных воркеров
```bash
docker compose up -d --build transcribe analyse tagging
```
### Обработка нового файла
```bash
cp recording.wav storage/incoming/
```
### Проверка статуса
```bash
docker compose ps
docker compose logs -f watcher transcribe analyse tagging
```
### RabbitMQ — просмотр очереди final
UI: http://localhost:15672 → Queues → `final`
### Развёртывание в k3s
Полная инструкция: **[k8s/README.md](k8s/README.md)**
Кратко:
```bash
# 1. секреты из .env
./k8s/prepare-secret.sh
# 2. каталог хранилища на ноде k3s
sudo mkdir -p /var/lib/audio-pipeline/storage/{incoming,processing,failed}
# 3. сборка образов и импорт в k3s
./k8s/build-images.sh
# 4. деплой
kubectl apply -k k8s/
# 5. проверка
kubectl -n audio-pipeline get pods
```
Загрузка файла на ноду:
```bash
sudo cp recording.wav /var/lib/audio-pipeline/storage/incoming/
```
---
## Логирование
Все воркеры пишут **структурированные JSON-логи** в stdout.
### Ключевые события
| Событие | Воркер | Описание |
|--------------------------|-----------|---------------------------------------|
| `claimed file` | watcher | Файл взят в обработку |
| `transcribed` | transcribe| STT завершён |
| `llm call ok` | analyse/tagging | Вызов Yandex API |
| `task complete` | analyse/tagging | Оба результата готовы |
| `published final` | analyse/tagging | Сообщение в final |
| `processing file deleted` | analyse/tagging | Файл удалён из processing |
| `yandex api check ok` | analyse/tagging | Проверка API при старте |
### Поиск в логах
```bash
# все LLM-вызовы
docker compose logs analyse 2>&1 | grep '"llm call'
# завершённые задачи
docker compose logs 2>&1 | grep '"task complete"'
# ошибки
docker compose logs 2>&1 | grep '"level":"WARN"'
```
---
## Обработка ошибок
| Ситуация | Поведение |
|----------------------------------|------------------------------------------------|
| Битое JSON в очереди | `Nack(requeue=false)` — в DLQ |
| Ошибка Nexara STT | `Nack(requeue=false)` — в DLQ |
| Ошибка Yandex LLM | `Nack(requeue=false)` — сообщение отбрасывается |
| Ошибка сохранения в Postgres | `Nack(requeue=false)` — отбрасывается |
| Redelivered сообщение | Пропускается (без повторного вызова LLM) |
| Ошибка публикации в final | Файл **не** удаляется |
| Yandex API недоступен при старте | Воркер не запускается, контейнер рестартит |
**Политика без повторов:** каждый промпт / классификация — ровно один вызов LLM. Повторные доставки RabbitMQ игнорируются.
---
## Переключение промптов на API
Уже реализовано в transcribe. Достаточно изменить `.env`:
```env
PROMPTS_SOURCE=http
PROMPTS_BASE_URL=https://your-api.example.com
PROMPTS_API_KEY=your_token
PROMPTS_SECTION=1
```
**Запрос:** `GET {PROMPTS_BASE_URL}/metrics/?id_section=1`
**Ожидаемый ответ** — массив в том же формате, что `prompts.json`:
```json
[
{
"id": 1,
"id_section": 1,
"name": "behavioral",
"prompt": "полный текст промпта...",
"dt_create": "2026-06-09T09:00:00"
}
]
```
Analyse менять не нужно — промпты приходят в сообщении RabbitMQ.
---
## Что не реализовано
- **Consumer очереди `final`** — нет воркера, который читает итоговые сообщения
- **DLQ-очередь** `transcribe.tasks.failed` — exchange `dlx` объявлен, но отдельная очередь может не быть привязана
- **Повторная обработка** при ошибках LLM — намеренно отключена
- **Архивация** удалённых файлов — файлы удаляются без бэкапа
---
## Быстрый troubleshooting
| Проблема | Решение |
|---------------------------------------|--------------------------------------------------------|
| Файл не обрабатывается | Проверить `incoming/`, логи watcher |
| TLS timeout к Yandex из контейнера | MTU Docker, VPN, `docker compose restart` |
| Старый Yandex токен | Обновить `.env`, `docker compose restart tagging analyse` |
| `context canceled` в final | Исправлено — пересобрать analyse/tagging |
| `metadata` column does not exist | `ALTER TABLE results ADD COLUMN IF NOT EXISTS metadata JSONB;` |
| Файл остаётся в processing | Проверить, дошли ли оба воркера до `published final` |