From 00ddac5af7c0d3c993929617fa60c79f00ceaf89 Mon Sep 17 00:00:00 2001 From: sanek5g Date: Wed, 10 Jun 2026 17:12:58 +0300 Subject: [PATCH] audio_pipeline --- .gitignore | 1 + README.md | 661 +++++++++++++++++ db/init.sql | 13 + docker-compose.yml | 100 +++ watcher/Dockerfile | 11 + watcher/cmd/watcher/main.go | 115 +++ watcher/go.mod | 8 + watcher/go.sum | 21 + watcher/internal/config/config.go | 60 ++ watcher/internal/publisher/publisher.go | 58 ++ watcher/internal/scanner/scanner.go | 144 ++++ workers/analyse/Dockerfile | 12 + workers/analyse/cmd/analyse/main.go | 657 +++++++++++++++++ workers/analyse/go.mod | 18 + workers/analyse/go.sum | 41 ++ workers/tagging/Dockerfile | 11 + workers/tagging/cmd/tagging/main.go | 685 ++++++++++++++++++ workers/tagging/go.mod | 18 + workers/tagging/go.sum | 41 ++ workers/transcribe/Dockerfile | 11 + workers/transcribe/cmd/transcribe/main.go | 64 ++ workers/transcribe/configs/prompts.json | 23 + workers/transcribe/go.mod | 5 + workers/transcribe/go.sum | 18 + workers/transcribe/internal/config/config.go | 78 ++ .../transcribe/internal/consumer/consumer.go | 172 +++++ workers/transcribe/internal/models/models.go | 34 + workers/transcribe/internal/nexara/nexara.go | 117 +++ .../transcribe/internal/prompts/prompts.go | 100 +++ 29 files changed, 3297 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 db/init.sql create mode 100644 docker-compose.yml create mode 100644 watcher/Dockerfile create mode 100644 watcher/cmd/watcher/main.go create mode 100644 watcher/go.mod create mode 100644 watcher/go.sum create mode 100644 watcher/internal/config/config.go create mode 100644 watcher/internal/publisher/publisher.go create mode 100644 watcher/internal/scanner/scanner.go create mode 100644 workers/analyse/Dockerfile create mode 100644 workers/analyse/cmd/analyse/main.go create mode 100644 workers/analyse/go.mod create mode 100644 workers/analyse/go.sum create mode 100644 workers/tagging/Dockerfile create mode 100644 workers/tagging/cmd/tagging/main.go create mode 100644 workers/tagging/go.mod create mode 100644 workers/tagging/go.sum create mode 100644 workers/transcribe/Dockerfile create mode 100644 workers/transcribe/cmd/transcribe/main.go create mode 100644 workers/transcribe/configs/prompts.json create mode 100644 workers/transcribe/go.mod create mode 100644 workers/transcribe/go.sum create mode 100644 workers/transcribe/internal/config/config.go create mode 100644 workers/transcribe/internal/consumer/consumer.go create mode 100644 workers/transcribe/internal/models/models.go create mode 100644 workers/transcribe/internal/nexara/nexara.go create mode 100644 workers/transcribe/internal/prompts/prompts.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c49bd7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.env diff --git a/README.md b/README.md new file mode 100644 index 0000000..f6bcfe6 --- /dev/null +++ b/README.md @@ -0,0 +1,661 @@ +# Audio Pipeline + +Пайплайн обработки аудиозаписей звонков: от появления файла на диске до классификации, анализа по промптам и публикации итогового результата в очередь `final`. + +Система построена на **файловом триггере** + **RabbitMQ** + **PostgreSQL** + внешних API (Nexara STT, Yandex LLM). + +--- + +## Содержание + +1. [Общая схема](#общая-схема) +2. [Структура проекта](#структура-проекта) +3. [Инфраструктура](#инфраструктура) +4. [Этапы обработки](#этапы-обработки) +5. [Файловое хранилище](#файловое-хранилище) +6. [RabbitMQ](#rabbitmq) +7. [PostgreSQL](#postgresql) +8. [Воркеры](#воркеры) +9. [Форматы сообщений](#форматы-сообщений) +10. [Промпты для анализа](#промпты-для-анализа) +11. [Агрегация и очередь final](#агрегация-и-очередь-final) +12. [Конфигурация (.env)](#конфигурация-env) +13. [Запуск и управление](#запуск-и-управление) +14. [Логирование](#логирование) +15. [Обработка ошибок](#обработка-ошибок) +16. [Переключение промптов на API](#переключение-промптов-на-api) +17. [Что не реализовано](#что-не-реализовано) + +--- + +## Общая схема + +``` +┌─────────────┐ ┌──────────────┐ ┌─────────────┐ +│ incoming/ │────▶│ watcher │────▶│ RabbitMQ │ +│ (файлы) │ │ (сканер) │ │ audio.new │ +└─────────────┘ └──────────────┘ └──────┬──────┘ + │ + ┌──────────────┐ ▼ + │ processing/ │ ┌─────────────┐ + │ (аудио) │◀────│ transcribe │── Nexara API (STT) + └──────────────┘ └──────┬──────┘ + │ + fanout transcription_done + ┌───────────┴───────────┐ + ▼ ▼ + ┌─────────────┐ ┌─────────────┐ + │ analyse │ │ tagging │ + │ Yandex LLM │ │ Yandex LLM │ + └──────┬──────┘ └──────┬──────┘ + │ │ + └───────────┬───────────┘ + ▼ + ┌─────────────┐ + │ PostgreSQL │ + │ results │ + └──────┬──────┘ + │ + оба готовы ───────┤ + ▼ + ┌─────────────┐ + │ final │ (RabbitMQ) + │ + удаление │ + │ файла │ + └─────────────┘ +``` + +**Ключевые принципы:** + +- Каждый этап — отдельный Docker-сервис (воркер). +- Связь между этапами — через RabbitMQ (асинхронно). +- Промежуточные и итоговые результаты LLM — в PostgreSQL. +- `analyse` и `tagging` работают **параллельно** после транскрипции. +- В `final` публикует тот воркер, который завершился **последним**. + +--- + +## Структура проекта + +``` +audio-pipeline/ +├── docker-compose.yml # инфраструктура и все сервисы +├── .env # конфигурация (не в git) +├── db/ +│ └── init.sql # схема PostgreSQL +├── storage/ # аудиофайлы на хосте (монтируется в контейнеры) +│ ├── incoming/ # сюда кладут новые файлы +│ ├── processing/ # файлы в обработке +│ └── failed/ # файлы при критических ошибках watcher +├── watcher/ # сканер файловой системы +│ ├── cmd/watcher/main.go +│ └── internal/ +│ ├── config/ +│ ├── scanner/ +│ └── publisher/ +└── workers/ + ├── transcribe/ # STT + загрузка промптов + fanout + │ ├── cmd/transcribe/main.go + │ ├── configs/prompts.json + │ └── internal/ + │ ├── consumer/ + │ ├── nexara/ + │ ├── prompts/ + │ └── models/ + ├── analyse/ # анализ по промптам (Yandex LLM) + │ └── cmd/analyse/main.go + └── tagging/ # классификация диалога (Yandex LLM) + └── cmd/tagging/main.go +``` + +--- + +## Инфраструктура + +| Сервис | Контейнер | Порты | Назначение | +|-------------|-------------|----------------|-------------------------------| +| `rabbit` | rabbit | 5672, 15672 | RabbitMQ + Management UI | +| `postgres` | postgres | 5432 | Хранение результатов | +| `watcher` | watcher | — | Мониторинг `incoming/` | +| `transcribe`| transcribe | — | Транскрипция + fanout | +| `analyse` | analyse | — | Анализ по промптам | +| `tagging` | tagging | — | Классификация диалога | + +**RabbitMQ UI:** http://localhost:15672 (логин/пароль из `.env`) + +--- + +## Этапы обработки + +### 1. Watcher — обнаружение файла + +**Триггер:** появление аудиофайла в `{STORAGE_ROOT}/incoming/`. + +**Алгоритм:** + +1. Каждые `POLL_INTERVAL` (по умолчанию 5 с) сканирует `incoming/`. +2. Пропускает скрытые файлы (`.`) и временные (`.tmp`). +3. Проверяет расширение: `.mp3`, `.wav`, `.m4a`, `.ogg`, `.flac`, `.webm`. +4. Ждёт стабилизации размера файла (`STABLE_WINDOW` / `STABLE_CHECKS`) — защита от незавершённой загрузки. +5. Генерирует ULID `task_id`. +6. Атомарно переименовывает: `incoming/name.wav` → `processing/{task_id}.wav`. +7. Публикует задачу в RabbitMQ (`audio_pipeline` / `audio.new`). + +**При ошибке публикации:** файл возвращается в `incoming/`. Если откат невозможен — перемещается в `failed/`. + +### 2. Transcribe — транскрипция + +**Вход:** очередь `transcribe.tasks`. + +**Алгоритм:** + +1. Читает аудиофайл по `file_path` из сообщения. +2. Отправляет в **Nexara API** (Speech-to-Text). +3. Загружает промпты (`prompts.json` или HTTP API). +4. Формирует `TranscriptionResult` и публикует в fanout-exchange `transcription_done`. +5. Сообщение доставляется **одновременно** в очереди `analyse` и `tagging`. + +### 3. Tagging — классификация диалога + +**Вход:** очередь `tagging`. + +**Алгоритм:** + +1. Получает транскрипцию из сообщения. +2. Отправляет **один** запрос в **Yandex LLM** с промптом классификации (L1/L2/L3, risk_level и т.д.). +3. Сохраняет результат в `results.tagging` (PostgreSQL). +4. Если `results.analysis` уже заполнен — публикует в `final` и удаляет файл. + +### 4. Analyse — анализ по промптам + +**Вход:** очередь `analyse`. + +**Алгоритм:** + +1. Получает транскрипцию и массив `prompts` из сообщения. +2. Для **каждого** промпта — отдельный запрос в **Yandex LLM** (сейчас 3 промпта: behavioral, client_data, cargo_data). +3. Сохраняет результат в `results.analysis`, метаданные — в `results.metadata`. +4. Если `results.tagging` уже заполнен — публикует в `final` и удаляет файл. + +### 5. Final — итоговое сообщение + +**Выход:** очередь `final` (default exchange, routing key = `final`). + +Публикуется **полный JSON** из таблицы `results`: транскрипция, analysis, tagging, metadata, статус, timestamps. + +После успешной публикации аудиофайл удаляется из `processing/`. + +> **Consumer для очереди `final` пока не реализован** — сообщения накапливаются в очереди до подключения обработчика. + +--- + +## Файловое хранилище + +Путь на хосте задаётся `STORAGE_ROOT` (по умолчанию `./storage`). + +| Директория | Назначение | +|---------------|-------------------------------------------------| +| `incoming/` | Новые файлы для обработки | +| `processing/` | Файлы в работе (после claim watcher) | +| `failed/` | Файлы при невосстановимых ошибках watcher | + +**Жизненный цикл файла:** + +``` +incoming/recording.wav + → processing/01KTN....wav (watcher) + → остаётся до завершения (transcribe читает по пути) + → удаляется (после публикации в final) +``` + +Внутри контейнеров путь: `/data/storage/...` (volume mount). + +--- + +## RabbitMQ + +### Топология + +``` +exchange: audio_pipeline (direct) + └── queue: transcribe.tasks ← routing key: audio.new + +exchange: transcription_done (fanout) + ├── queue: analyse + └── queue: tagging + +queue: final (default exchange, без binding) +``` + +### Очереди + +| Очередь | Producer | Consumer | Описание | +|--------------------|------------|------------|-----------------------------| +| `transcribe.tasks` | watcher | transcribe | Новые аудиозадачи | +| `analyse` | transcribe | analyse | Результат транскрипции | +| `tagging` | transcribe | tagging | Результат транскрипции | +| `final` | analyse/tagging | — | Итоговый результат | + +### Dead Letter + +Очередь `transcribe.tasks` настроена с DLX (`dlx`). Сообщения с `Nack(requeue=false)` уходят в dead-letter. Отдельная DLQ-очередь может потребовать дополнительной настройки. + +--- + +## PostgreSQL + +### Таблица `results` + +```sql +CREATE TABLE results ( + task_id TEXT PRIMARY KEY, + filename TEXT, + transcription TEXT, + analysis JSONB, -- результат analyse (Yandex LLM) + tagging JSONB, -- результат tagging (Yandex LLM) + metadata JSONB, -- file_path, segments, prompts, language, transcribed_at + status TEXT DEFAULT 'pending', -- pending | done + created_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ +); +``` + +### Кто что пишет + +| Поле | Пишет | Когда | +|-----------------|----------|--------------------------------| +| `tagging` | tagging | После классификации | +| `analysis` | analyse | После анализа | +| `filename` | оба | При сохранении своей части | +| `transcription` | analyse | При сохранении analysis | +| `metadata` | analyse | file_path, segments, prompts… | +| `status` | оба | `done` когда оба поля заполнены| + +### Просмотр данных + +```bash +docker exec -it postgres psql -U pipeline -d pipeline +``` + +```sql +SELECT task_id, filename, status, updated_at FROM results ORDER BY updated_at DESC; + +SELECT task_id, jsonb_pretty(analysis), jsonb_pretty(tagging) +FROM results WHERE task_id = 'ВАШ_TASK_ID'; +``` + +--- + +## Воркеры + +### Watcher + +- **Язык:** Go +- **Зависимости:** RabbitMQ +- **Volume:** `${STORAGE_ROOT}:/data/storage` +- **Не использует:** Postgres, Yandex, Nexara + +### Transcribe + +- **Язык:** Go +- **API:** Nexara (STT) +- **Промпты:** static file или HTTP +- **Volume:** storage + `configs/prompts.json` + +### Tagging + +- **Язык:** Go +- **API:** Yandex Cloud LLM (`YANDEX_API_KEY`, `YANDEX_MODEL`) +- **При старте:** тестовый запрос к Yandex API (проверка подключения) +- **Volume:** storage (для удаления файлов) + `.env` (hot-reload токена) + +### Analyse + +- **Язык:** Go +- **API:** Yandex Cloud LLM +- **Вызовов LLM на задачу:** по числу промптов (сейчас 3) +- **Без повторов:** один вызов на промпт, при ошибке — сообщение отбрасывается +- **Volume:** storage + `.env` + +--- + +## Форматы сообщений + +### 1. Watcher → Transcribe + +**Exchange:** `audio_pipeline` (direct) +**Routing key:** `audio.new` +**Queue:** `transcribe.tasks` + +```json +{ + "task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W", + "file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav", + "filename": "1.wav", + "size": 1234567, + "created_at": 1780914907 +} +``` + +### 2. Transcribe → Analyse + Tagging + +**Exchange:** `transcription_done` (fanout) +**Queues:** `analyse`, `tagging` (одинаковое тело) + +```json +{ + "task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W", + "filename": "1.wav", + "file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav", + "transcription": "полный текст транскрипции...", + "language": "ru", + "segments": [ + {"start": 0.0, "end": 27.96, "text": "..."} + ], + "prompts": [ + { + "id": 1, + "id_section": 1, + "name": "behavioral", + "prompt": "Ты — строгий классификатор звонков...", + "dt_create": "2026-06-09T09:00:00" + } + ], + "transcribed_at": 1780914907 +} +``` + +### 3. Final — итоговое сообщение + +**Queue:** `final` + +```json +{ + "task_id": "01KTNVA3EKW8CY2QDAYKKVZ40W", + "filename": "1.wav", + "transcription": "полный текст...", + "analysis": { + "behavioral": { + "greeting": {"value": true, "evidence": "Здравствуйте", "confidence": 0.95}, + "initiative": {"value": true, "evidence": "...", "confidence": 0.8}, + "questions_check": {"value": false, "evidence": null, "confidence": 0.0}, + "closing": {"value": true, "evidence": "всего доброго", "confidence": 0.9} + }, + "client_data": { "...": "..." }, + "cargo_data": { "...": "..." } + }, + "tagging": { + "L1": "tracking", + "L2": "location_request", + "L3": "delay", + "risk_level": "medium", + "has_action_items": true, + "has_deadline": false + }, + "file_path": "/data/storage/processing/01KTNVA3EKW8CY2QDAYKKVZ40W.wav", + "language": "ru", + "segments": [...], + "prompts": [...], + "transcribed_at": 1780914907, + "status": "done", + "created_at": "2026-06-09T09:00:00Z", + "updated_at": "2026-06-09T09:05:00Z" +} +``` + +--- + +## Промпты для анализа + +### Источник + +Файл: `workers/transcribe/configs/prompts.json` +Или HTTP API (см. [переключение на API](#переключение-промптов-на-api)). + +Transcribe загружает промпты и **вкладывает их в сообщение** для analyse. Analyse не знает, откуда они пришли. + +### Текущие промпты (3 штуки) + +| name | Назначение | +|----------------|---------------------------------------------------------| +| `behavioral` | Приветствие, инициативность, вопросы, прощание | +| `client_data` | Первый раз, город, тип клиента, контакты, источник | +| `cargo_data` | Характер груза, параметры, стоимость | + +Каждый промпт — **полный текст инструкции** с форматом JSON-ответа. Analyse отправляет: + +``` +<текст промпта из конфига> + +=== ТРАНСКРИПЦИЯ === +""" +<текст звонка> +""" +``` + +Ответ LLM сохраняется целиком под ключом `name` промпта в `analysis`. + +### Tagging — отдельный промпт + +Tagging **не использует** `prompts.json`. У него встроенный промпт классификации логистических диалогов (L1/L2/L3, risk_level, has_action_items, has_deadline). + +--- + +## Агрегация и очередь final + +Оба воркера (`analyse`, `tagging`) пишут в одну строку `results` по `task_id`. + +**Атомарная проверка готовности** (SQL): + +```sql +UPDATE results SET ... +RETURNING (analysis IS NOT NULL AND tagging IS NOT NULL) +``` + +- Если `RETURNING = false` — воркер ждёт второго. +- Если `RETURNING = true` — этот воркер: + 1. Читает полную строку из БД + 2. Публикует JSON в очередь `final` + 3. Удаляет файл из `processing/` (только пути с `/processing/`) + +--- + +## Конфигурация (.env) + +Пример ключевых переменных: + +```env +# Storage +STORAGE_ROOT=./storage + +# Watcher +POLL_INTERVAL=5s +STABLE_WINDOW=2s +STABLE_CHECKS=3 + +# RabbitMQ +RABBITMQ_URL=amqp://admin:secret123@rabbit:5672/ +RABBITMQ_EXCHANGE=audio_pipeline +RABBITMQ_ROUTING_KEY=audio.new + +# Transcribe +INPUT_QUEUE=transcribe.tasks +OUTPUT_EXCHANGE=transcription_done +ANALYSE_QUEUE=analyse +TAGGING_QUEUE=tagging +FINAL_QUEUE=final +PREFETCH=1 + +# Nexara (STT) +NEXARA_BASE_URL=https://api.nexara.ru +NEXARA_API_KEY=... +NEXARA_MODEL=whisper-1 +NEXARA_TIMEOUT=10m + +# Prompts +PROMPTS_SOURCE=static +PROMPTS_FILE=/app/configs/prompts.json +PROMPTS_SECTION=1 + +# Postgres +POSTGRES_USER=pipeline +POSTGRES_PASSWORD=pipeline_secret +POSTGRES_DB=pipeline +DATABASE_URL=postgres://pipeline:pipeline_secret@postgres:5432/pipeline?sslmode=disable + +# Yandex LLM (tagging + analyse) +YANDEX_API_KEY=t1.... +YANDEX_MODEL=gpt://folder_id/model_name +YANDEX_API_URL=https://ai.api.cloud.yandex.net/v1/chat/completions +``` + +### Hot-reload токена Yandex + +Воркеры `tagging` и `analyse` монтируют `.env` как `/config/.env` и перечитывают при **каждом старте** контейнера: + +```bash +docker compose restart tagging analyse +``` + +> `docker compose restart` не пересоздаёт контейнер, но перезапускает процесс, который читает свежий `.env`. + +--- + +## Запуск и управление + +### Первый запуск + +```bash +cd audio-pipeline +docker compose up -d --build +``` + +### Полный сброс (очереди + БД) + +```bash +docker compose down -v +docker compose up -d --build +``` + +### Пересборка отдельных воркеров + +```bash +docker compose up -d --build transcribe analyse tagging +``` + +### Обработка нового файла + +```bash +cp recording.wav storage/incoming/ +``` + +### Проверка статуса + +```bash +docker compose ps +docker compose logs -f watcher transcribe analyse tagging +``` + +### RabbitMQ — просмотр очереди final + +UI: http://localhost:15672 → Queues → `final` + +--- + +## Логирование + +Все воркеры пишут **структурированные JSON-логи** в stdout. + +### Ключевые события + +| Событие | Воркер | Описание | +|--------------------------|-----------|---------------------------------------| +| `claimed file` | watcher | Файл взят в обработку | +| `transcribed` | transcribe| STT завершён | +| `llm call ok` | analyse/tagging | Вызов Yandex API | +| `task complete` | analyse/tagging | Оба результата готовы | +| `published final` | analyse/tagging | Сообщение в final | +| `processing file deleted` | analyse/tagging | Файл удалён из processing | +| `yandex api check ok` | analyse/tagging | Проверка API при старте | + +### Поиск в логах + +```bash +# все LLM-вызовы +docker compose logs analyse 2>&1 | grep '"llm call' + +# завершённые задачи +docker compose logs 2>&1 | grep '"task complete"' + +# ошибки +docker compose logs 2>&1 | grep '"level":"WARN"' +``` + +--- + +## Обработка ошибок + +| Ситуация | Поведение | +|----------------------------------|------------------------------------------------| +| Битое JSON в очереди | `Nack(requeue=false)` — в DLQ | +| Ошибка Nexara STT | `Nack(requeue=false)` — в DLQ | +| Ошибка Yandex LLM | `Nack(requeue=false)` — сообщение отбрасывается | +| Ошибка сохранения в Postgres | `Nack(requeue=false)` — отбрасывается | +| Redelivered сообщение | Пропускается (без повторного вызова LLM) | +| Ошибка публикации в final | Файл **не** удаляется | +| Yandex API недоступен при старте | Воркер не запускается, контейнер рестартит | + +**Политика без повторов:** каждый промпт / классификация — ровно один вызов LLM. Повторные доставки RabbitMQ игнорируются. + +--- + +## Переключение промптов на API + +Уже реализовано в transcribe. Достаточно изменить `.env`: + +```env +PROMPTS_SOURCE=http +PROMPTS_BASE_URL=https://your-api.example.com +PROMPTS_API_KEY=your_token +PROMPTS_SECTION=1 +``` + +**Запрос:** `GET {PROMPTS_BASE_URL}/metrics/?id_section=1` + +**Ожидаемый ответ** — массив в том же формате, что `prompts.json`: + +```json +[ + { + "id": 1, + "id_section": 1, + "name": "behavioral", + "prompt": "полный текст промпта...", + "dt_create": "2026-06-09T09:00:00" + } +] +``` + +Analyse менять не нужно — промпты приходят в сообщении RabbitMQ. + +--- + +## Что не реализовано + +- **Consumer очереди `final`** — нет воркера, который читает итоговые сообщения +- **DLQ-очередь** `transcribe.tasks.failed` — exchange `dlx` объявлен, но отдельная очередь может не быть привязана +- **Повторная обработка** при ошибках LLM — намеренно отключена +- **Архивация** удалённых файлов — файлы удаляются без бэкапа + +--- + +## Быстрый troubleshooting + +| Проблема | Решение | +|---------------------------------------|--------------------------------------------------------| +| Файл не обрабатывается | Проверить `incoming/`, логи watcher | +| TLS timeout к Yandex из контейнера | MTU Docker, VPN, `docker compose restart` | +| Старый Yandex токен | Обновить `.env`, `docker compose restart tagging analyse` | +| `context canceled` в final | Исправлено — пересобрать analyse/tagging | +| `metadata` column does not exist | `ALTER TABLE results ADD COLUMN IF NOT EXISTS metadata JSONB;` | +| Файл остаётся в processing | Проверить, дошли ли оба воркера до `published final` | diff --git a/db/init.sql b/db/init.sql new file mode 100644 index 0000000..00630da --- /dev/null +++ b/db/init.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS results ( + task_id TEXT PRIMARY KEY, + filename TEXT, + transcription TEXT, + analysis JSONB, + tagging JSONB, + metadata JSONB, + status TEXT NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +ALTER TABLE results ADD COLUMN IF NOT EXISTS metadata JSONB; diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d1a1670 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,100 @@ +services: + rabbit: + image: rabbitmq:3-management-alpine + container_name: rabbit + ports: + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER:-admin} + RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS:-secret123} + healthcheck: + test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"] + interval: 10s + timeout: 5s + retries: 5 + restart: unless-stopped + + postgres: + image: postgres:16-alpine + container_name: postgres + ports: + - "5432:5432" + environment: + POSTGRES_USER: ${POSTGRES_USER:-pipeline} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-pipeline_secret} + POSTGRES_DB: ${POSTGRES_DB:-pipeline} + volumes: + - postgres_data:/var/lib/postgresql/data + - ./db/init.sql:/docker-entrypoint-initdb.d/init.sql:ro + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-pipeline} -d ${POSTGRES_DB:-pipeline}"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 15s + restart: unless-stopped + + watcher: + build: ./watcher + container_name: watcher + env_file: .env + volumes: + - ${STORAGE_ROOT:-./storage}:/data/storage + environment: + STORAGE_ROOT: /data/storage + depends_on: + rabbit: + condition: service_healthy + restart: unless-stopped + + transcribe: + build: ./workers/transcribe + container_name: transcribe + env_file: .env + volumes: + - ${STORAGE_ROOT:-./storage}:/data/storage + - ./workers/transcribe/configs:/app/configs:ro + environment: + STORAGE_ROOT: /data/storage + depends_on: + rabbit: + condition: service_healthy + restart: unless-stopped + + tagging: + build: ./workers/tagging + container_name: tagging + env_file: .env + volumes: + - ${STORAGE_ROOT:-./storage}:/data/storage + - ./.env:/config/.env:ro + environment: + STORAGE_ROOT: /data/storage + DOTENV_PATH: /config/.env + depends_on: + rabbit: + condition: service_healthy + postgres: + condition: service_healthy + restart: unless-stopped + + analyse: + build: ./workers/analyse + container_name: analyse + env_file: .env + volumes: + - ${STORAGE_ROOT:-./storage}:/data/storage + - ./.env:/config/.env:ro + environment: + STORAGE_ROOT: /data/storage + DOTENV_PATH: /config/.env + depends_on: + rabbit: + condition: service_healthy + postgres: + condition: service_healthy + restart: unless-stopped + +volumes: + postgres_data: diff --git a/watcher/Dockerfile b/watcher/Dockerfile new file mode 100644 index 0000000..155b416 --- /dev/null +++ b/watcher/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.22-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -o /watcher ./cmd/watcher + +FROM alpine:3.19 +RUN apk add --no-cache ca-certificates +COPY --from=build /watcher /watcher +ENTRYPOINT ["/watcher"] diff --git a/watcher/cmd/watcher/main.go b/watcher/cmd/watcher/main.go new file mode 100644 index 0000000..67862dd --- /dev/null +++ b/watcher/cmd/watcher/main.go @@ -0,0 +1,115 @@ +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/yourorg/watcher/internal/config" + "github.com/yourorg/watcher/internal/publisher" + "github.com/yourorg/watcher/internal/scanner" +) + +func main() { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))) + + cfg := config.Load() + sc := scanner.New(scanner.Config{ + StorageRoot: cfg.StorageRoot, + IncomingDir: cfg.IncomingDir, + ProcessingDir: cfg.ProcessingDir, + FailedDir: cfg.FailedDir, + StableWindow: cfg.StableWindow, + StableChecks: cfg.StableChecks, + }) + if err := sc.EnsureDirs(); err != nil { + slog.Error("ensure dirs failed", "error", err) + os.Exit(1) + } + + ch := mustRabbit(cfg.RabbitURL) + if err := ch.ExchangeDeclare(cfg.Exchange, "direct", true, false, false, false, nil); err != nil { + slog.Error("declare exchange failed", "error", err) + os.Exit(1) + } + pub, err := publisher.New(ch, cfg.Exchange, cfg.RoutingKey) + if err != nil { + slog.Error("publisher init failed", "error", err) + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + slog.Info("watcher started", + "storage_root", cfg.StorageRoot, + "poll_interval", cfg.PollInterval.String(), + "exchange", cfg.Exchange, + "routing_key", cfg.RoutingKey, + ) + + ticker := time.NewTicker(cfg.PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + slog.Info("watcher stopping") + return + case <-ticker.C: + claimed, err := sc.ScanOnce() + if err != nil { + slog.Warn("scan failed", "error", err) + continue + } + for _, cf := range claimed { + task := publisher.AudioTask{ + TaskID: cf.TaskID, + FilePath: cf.FilePath, + Filename: cf.Filename, + Size: cf.Size, + } + pubCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + err := pub.Publish(pubCtx, task) + cancel() + if err != nil { + slog.Warn("publish failed, rolling back", "task_id", cf.TaskID, "error", err) + if rbErr := sc.RollbackToIncoming(cf.FilePath, cf.Filename); rbErr != nil { + slog.Error("rollback failed, moving to failed", "task_id", cf.TaskID, "error", rbErr) + _ = sc.MoveToFailed(cf.FilePath, cf.Filename) + } + continue + } + slog.Info("task published", "task_id", cf.TaskID, "filename", cf.Filename) + } + } + } +} + +func mustRabbit(url string) *amqp.Channel { + var conn *amqp.Connection + var err error + for i := 0; i < 30; i++ { + conn, err = amqp.Dial(url) + if err == nil { + break + } + slog.Info("waiting for rabbit", "attempt", i+1, "error", err) + time.Sleep(2 * time.Second) + } + if err != nil { + slog.Error("rabbit unreachable", "error", err) + os.Exit(1) + } + ch, err := conn.Channel() + if err != nil { + slog.Error("rabbit channel failed", "error", err) + os.Exit(1) + } + return ch +} diff --git a/watcher/go.mod b/watcher/go.mod new file mode 100644 index 0000000..26fa9ce --- /dev/null +++ b/watcher/go.mod @@ -0,0 +1,8 @@ +module github.com/yourorg/watcher + +go 1.22 + +require ( + github.com/oklog/ulid/v2 v2.1.0 + github.com/rabbitmq/amqp091-go v1.9.0 +) diff --git a/watcher/go.sum b/watcher/go.sum new file mode 100644 index 0000000..f50c5b4 --- /dev/null +++ b/watcher/go.sum @@ -0,0 +1,21 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= +github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= +github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/watcher/internal/config/config.go b/watcher/internal/config/config.go new file mode 100644 index 0000000..c1f3d44 --- /dev/null +++ b/watcher/internal/config/config.go @@ -0,0 +1,60 @@ +package config + +import ( + "os" + "strconv" + "time" +) + +type Config struct { + StorageRoot string + IncomingDir string + ProcessingDir string + FailedDir string + PollInterval time.Duration + StableWindow time.Duration + StableChecks int + RabbitURL string + Exchange string + RoutingKey string +} + +func Load() Config { + return Config{ + StorageRoot: getEnv("STORAGE_ROOT", "/data/storage"), + IncomingDir: getEnv("INCOMING_DIR", "incoming"), + ProcessingDir: getEnv("PROCESSING_DIR", "processing"), + FailedDir: getEnv("FAILED_DIR", "failed"), + PollInterval: getDuration("POLL_INTERVAL", 5*time.Second), + StableWindow: getDuration("STABLE_WINDOW", 2*time.Second), + StableChecks: getInt("STABLE_CHECKS", 3), + RabbitURL: getEnv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/"), + Exchange: getEnv("RABBITMQ_EXCHANGE", "audio_pipeline"), + RoutingKey: getEnv("RABBITMQ_ROUTING_KEY", "audio.new"), + } +} + +func getEnv(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func getInt(key string, def int) int { + if v := os.Getenv(key); v != "" { + if i, err := strconv.Atoi(v); err == nil { + return i + } + } + return def +} + +func getDuration(key string, def time.Duration) time.Duration { + if v := os.Getenv(key); v != "" { + if d, err := time.ParseDuration(v); err == nil { + return d + } + } + return def +} diff --git a/watcher/internal/publisher/publisher.go b/watcher/internal/publisher/publisher.go new file mode 100644 index 0000000..b4c6536 --- /dev/null +++ b/watcher/internal/publisher/publisher.go @@ -0,0 +1,58 @@ +package publisher + +import ( + "context" + "encoding/json" + "fmt" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type AudioTask struct { + TaskID string `json:"task_id"` + FilePath string `json:"file_path"` + Filename string `json:"filename"` + Size int64 `json:"size"` + CreatedAt int64 `json:"created_at"` +} + +type Publisher struct { + ch *amqp.Channel + exchange string + routingKey string +} + +func New(ch *amqp.Channel, exchange, routingKey string) (*Publisher, error) { + if err := ch.Confirm(false); err != nil { + return nil, fmt.Errorf("confirm mode: %w", err) + } + return &Publisher{ch: ch, exchange: exchange, routingKey: routingKey}, nil +} + +func (p *Publisher) Publish(ctx context.Context, task AudioTask) error { + if task.CreatedAt == 0 { + task.CreatedAt = time.Now().Unix() + } + body, err := json.Marshal(task) + if err != nil { + return err + } + confirms := p.ch.NotifyPublish(make(chan amqp.Confirmation, 1)) + if err := p.ch.PublishWithContext(ctx, p.exchange, p.routingKey, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + return err + } + select { + case confirm := <-confirms: + if !confirm.Ack { + return fmt.Errorf("publish not confirmed") + } + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/watcher/internal/scanner/scanner.go b/watcher/internal/scanner/scanner.go new file mode 100644 index 0000000..dad91bc --- /dev/null +++ b/watcher/internal/scanner/scanner.go @@ -0,0 +1,144 @@ +package scanner + +import ( + "fmt" + "log/slog" + "os" + "path/filepath" + "strings" + "time" + + "github.com/oklog/ulid/v2" +) + +var allowedExts = map[string]bool{ + ".mp3": true, ".wav": true, ".m4a": true, + ".ogg": true, ".flac": true, ".webm": true, +} + +type ClaimedFile struct { + TaskID string + FilePath string + Filename string + Size int64 +} + +type Config struct { + StorageRoot string + IncomingDir string + ProcessingDir string + FailedDir string + StableWindow time.Duration + StableChecks int +} + +type Scanner struct { + cfg Config +} + +func New(cfg Config) *Scanner { + return &Scanner{cfg: cfg} +} + +func (s *Scanner) EnsureDirs() error { + for _, dir := range []string{s.cfg.IncomingDir, s.cfg.ProcessingDir, s.cfg.FailedDir} { + if err := os.MkdirAll(filepath.Join(s.cfg.StorageRoot, dir), 0o755); err != nil { + return err + } + } + return nil +} + +func (s *Scanner) ScanOnce() ([]ClaimedFile, error) { + incoming := filepath.Join(s.cfg.StorageRoot, s.cfg.IncomingDir) + entries, err := os.ReadDir(incoming) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + var claimed []ClaimedFile + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if strings.HasPrefix(name, ".") || strings.HasSuffix(strings.ToLower(name), ".tmp") { + continue + } + ext := strings.ToLower(filepath.Ext(name)) + if !allowedExts[ext] { + continue + } + src := filepath.Join(incoming, name) + if !s.isStable(src) { + continue + } + cf, err := s.claim(src, name, ext) + if err != nil { + slog.Warn("claim failed", "file", name, "error", err) + continue + } + claimed = append(claimed, cf) + } + return claimed, nil +} + +func (s *Scanner) isStable(path string) bool { + var lastSize int64 = -1 + for i := 0; i < s.cfg.StableChecks; i++ { + info, err := os.Stat(path) + if err != nil { + return false + } + size := info.Size() + if lastSize >= 0 && size != lastSize { + return false + } + lastSize = size + if i < s.cfg.StableChecks-1 { + time.Sleep(s.cfg.StableWindow) + } + } + return true +} + +func (s *Scanner) claim(src, originalName, ext string) (ClaimedFile, error) { + info, err := os.Stat(src) + if err != nil { + return ClaimedFile{}, err + } + taskID := ulid.Make().String() + processing := filepath.Join(s.cfg.StorageRoot, s.cfg.ProcessingDir) + dst := filepath.Join(processing, taskID+ext) + if err := os.Rename(src, dst); err != nil { + return ClaimedFile{}, fmt.Errorf("rename: %w", err) + } + slog.Info("claimed file", "task_id", taskID, "filename", originalName, "path", dst, "size", info.Size()) + return ClaimedFile{ + TaskID: taskID, + FilePath: dst, + Filename: originalName, + Size: info.Size(), + }, nil +} + +func (s *Scanner) RollbackToIncoming(filePath, originalName string) error { + incoming := filepath.Join(s.cfg.StorageRoot, s.cfg.IncomingDir) + dst := filepath.Join(incoming, originalName) + if err := os.Rename(filePath, dst); err != nil { + return s.MoveToFailed(filePath, originalName) + } + return nil +} + +func (s *Scanner) MoveToFailed(filePath, originalName string) error { + failed := filepath.Join(s.cfg.StorageRoot, s.cfg.FailedDir) + if err := os.MkdirAll(failed, 0o755); err != nil { + return err + } + dst := filepath.Join(failed, originalName) + return os.Rename(filePath, dst) +} diff --git a/workers/analyse/Dockerfile b/workers/analyse/Dockerfile new file mode 100644 index 0000000..395fccf --- /dev/null +++ b/workers/analyse/Dockerfile @@ -0,0 +1,12 @@ +FROM golang:1.22-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /analyse ./cmd/analyse + +FROM alpine:3.20 +RUN apk add --no-cache ca-certificates +WORKDIR /app +COPY --from=build /analyse /app/analyse +ENTRYPOINT ["/app/analyse"] diff --git a/workers/analyse/cmd/analyse/main.go b/workers/analyse/cmd/analyse/main.go new file mode 100644 index 0000000..4af726f --- /dev/null +++ b/workers/analyse/cmd/analyse/main.go @@ -0,0 +1,657 @@ +package main + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "os" + "strings" + "time" + "unicode/utf8" + + "github.com/joho/godotenv" + _ "github.com/jackc/pgx/v5/stdlib" + amqp "github.com/rabbitmq/amqp091-go" +) + +func init() { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))) +} + +// ── входящее сообщение из очереди analyse (TranscriptionResult от transcribe) ── + +type WorkerMessage struct { + TaskID string `json:"task_id"` + Filename string `json:"filename"` + FilePath string `json:"file_path"` + Transcription string `json:"transcription"` + Language string `json:"language"` + Segments []Segment `json:"segments,omitempty"` + Prompts []Prompt `json:"prompts"` + TranscribedAt int64 `json:"transcribed_at"` +} + +type Segment struct { + Start float64 `json:"start"` + End float64 `json:"end"` + Text string `json:"text"` +} + +type Prompt struct { + ID int `json:"id"` + IDSection int `json:"id_section"` + Name string `json:"name"` + Prompt string `json:"prompt"` + DtCreate string `json:"dt_create"` +} + +// AnalysisResult — ключ = name промпта, значение = полный JSON-ответ LLM. +type AnalysisResult map[string]any + +// ── LLM request/response ── + +type chatMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} +type chatRequest struct { + Model string `json:"model"` + Temperature float64 `json:"temperature"` + ResponseFormat struct { + Type string `json:"type"` + } `json:"response_format"` + Messages []chatMessage `json:"messages"` +} +type tokenUsage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` +} +type chatResponse struct { + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } `json:"choices"` + Usage *tokenUsage `json:"usage"` +} + +type llmCallResult struct { + Content string + RequestBytes int + ResponseBytes int + Usage *tokenUsage + Duration time.Duration +} + +type analysisStats struct { + LLMCalls int + TotalTokens int + PromptTokens int + OutputTokens int +} + +// ===================== LLM ===================== + +var llmHTTPClient = newLLMHTTPClient(150 * time.Second) + +func newLLMHTTPClient(totalTimeout time.Duration) *http.Client { + return &http.Client{ + Timeout: totalTimeout, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 60 * time.Second, + ResponseHeaderTimeout: 90 * time.Second, + ExpectContinueTimeout: 5 * time.Second, + IdleConnTimeout: 90 * time.Second, + }, + } +} + +func callLLM(ctx context.Context, apiURL, model, prompt string) (*llmCallResult, error) { + const systemPrompt = "Ты — строгий классификатор звонков. Отвечай только JSON, без пояснений." + + reqBody := chatRequest{ + Model: model, + Temperature: 0.1, + Messages: []chatMessage{ + {Role: "system", Content: systemPrompt}, + {Role: "user", Content: prompt}, + }, + } + reqBody.ResponseFormat.Type = "json_object" + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, "POST", apiURL, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+os.Getenv("YANDEX_API_KEY")) + req.Header.Set("Content-Type", "application/json") + + start := time.Now() + resp, err := llmHTTPClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + duration := time.Since(start) + + if resp.StatusCode != http.StatusOK { + return &llmCallResult{ + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Duration: duration, + }, fmt.Errorf("status %d: %s", resp.StatusCode, truncate(string(body), 500)) + } + + var result chatResponse + if err := json.Unmarshal(body, &result); err != nil { + return &llmCallResult{ + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Duration: duration, + }, err + } + if len(result.Choices) == 0 { + return &llmCallResult{ + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Duration: duration, + }, fmt.Errorf("empty response") + } + + return &llmCallResult{ + Content: result.Choices[0].Message.Content, + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Usage: result.Usage, + Duration: duration, + }, nil +} + +func checkYandexAPI(ctx context.Context, apiURL, model string) error { + slog.Info("yandex api check started", "worker", "analyse", "url", apiURL, "model", model) + + res, err := callLLM(ctx, apiURL, model, `Ответь только JSON: {"ok":true}`) + if err != nil { + return err + } + + attrs := []any{ + "worker", "analyse", + "duration_ms", res.Duration.Milliseconds(), + "response_chars", utf8.RuneCountInString(res.Content), + } + if res.Usage != nil { + attrs = append(attrs, + "prompt_tokens", res.Usage.PromptTokens, + "completion_tokens", res.Usage.CompletionTokens, + "total_tokens", res.Usage.TotalTokens, + ) + } + slog.Info("yandex api check ok", attrs...) + return nil +} + +func logLLMCall(taskID, model, promptName string, promptIndex, promptTotal, attempt, inputChars int, res *llmCallResult, err error) { + attrs := []any{ + "worker", "analyse", + "task_id", taskID, + "model", model, + "call_type", "analyse_prompt", + "prompt_name", promptName, + "prompt_index", promptIndex, + "prompt_total", promptTotal, + "attempt", attempt, + "input_chars", inputChars, + } + if res != nil { + attrs = append(attrs, + "duration_ms", res.Duration.Milliseconds(), + "request_bytes", res.RequestBytes, + "response_bytes", res.ResponseBytes, + "response_chars", utf8.RuneCountInString(res.Content), + ) + if res.Usage != nil { + attrs = append(attrs, + "prompt_tokens", res.Usage.PromptTokens, + "completion_tokens", res.Usage.CompletionTokens, + "total_tokens", res.Usage.TotalTokens, + ) + } + } + if err != nil { + slog.Warn("llm call failed", append(attrs, "error", err)...) + return + } + slog.Info("llm call ok", attrs...) +} + +func accumulateUsage(stats *analysisStats, res *llmCallResult) { + stats.LLMCalls++ + if res != nil && res.Usage != nil { + stats.TotalTokens += res.Usage.TotalTokens + stats.PromptTokens += res.Usage.PromptTokens + stats.OutputTokens += res.Usage.CompletionTokens + } +} + +func buildPromptQuery(transcription string, p Prompt) string { + var b strings.Builder + b.WriteString(p.Prompt) + b.WriteString("\n\n=== ТРАНСКРИПЦИЯ ===\n\"\"\"\n") + b.WriteString(transcription) + b.WriteString("\n\"\"\"") + return b.String() +} + +func analysePrompt(ctx context.Context, apiURL, model, transcription string, p Prompt, index, total int, taskID string, stats *analysisStats) (any, error) { + query := buildPromptQuery(transcription, p) + inputChars := utf8.RuneCountInString(query) + + res, err := callLLM(ctx, apiURL, model, query) + logLLMCall(taskID, model, p.Name, index, total, 1, inputChars, res, err) + accumulateUsage(stats, res) + if err != nil { + return nil, err + } + + var parsed any + if err := json.Unmarshal([]byte(res.Content), &parsed); err != nil { + return nil, fmt.Errorf("parse: %w, resp: %s", err, truncate(res.Content, 300)) + } + return parsed, nil +} + +func runAnalysis(ctx context.Context, apiURL, model, taskID, transcription string, prompts []Prompt) (AnalysisResult, analysisStats, error) { + stats := analysisStats{} + result := make(AnalysisResult, len(prompts)) + + valid := make([]Prompt, 0, len(prompts)) + for _, p := range prompts { + if p.Name != "" { + valid = append(valid, p) + } + } + total := len(valid) + + for i, p := range valid { + value, err := analysePrompt(ctx, apiURL, model, transcription, p, i+1, total, taskID, &stats) + if err != nil { + return nil, stats, fmt.Errorf("%s: %w", p.Name, err) + } + result[p.Name] = value + } + return result, stats, nil +} + +// ===================== DB ===================== + +func saveAnalysis(ctx context.Context, db *sql.DB, task WorkerMessage, analysis []byte) (complete bool, err error) { + metadata, _ := json.Marshal(map[string]any{ + "file_path": task.FilePath, + "language": task.Language, + "segments": task.Segments, + "prompts": task.Prompts, + "transcribed_at": task.TranscribedAt, + }) + + _, err = db.ExecContext(ctx, + `INSERT INTO results (task_id) VALUES ($1) ON CONFLICT (task_id) DO NOTHING`, task.TaskID) + if err != nil { + return false, fmt.Errorf("ensure row: %w", err) + } + + err = db.QueryRowContext(ctx, ` + UPDATE results + SET analysis = $2::jsonb, + filename = COALESCE(NULLIF($3, ''), filename), + transcription = COALESCE(NULLIF($4, ''), transcription), + metadata = COALESCE($5::jsonb, metadata), + updated_at = now(), + status = CASE WHEN tagging IS NOT NULL THEN 'done' ELSE status END + WHERE task_id = $1 + RETURNING (analysis IS NOT NULL AND tagging IS NOT NULL) + `, task.TaskID, string(analysis), task.Filename, task.Transcription, string(metadata)).Scan(&complete) + if err != nil { + return false, fmt.Errorf("update analysis: %w", err) + } + return complete, nil +} + +// ===================== MAIN ===================== + +func loadDotenv() { + path := os.Getenv("DOTENV_PATH") + if path == "" { + return + } + if err := godotenv.Overload(path); err != nil { + slog.Warn("dotenv load failed", "path", path, "error", err) + return + } + slog.Info("dotenv loaded", "path", path) +} + +func main() { + loadDotenv() + + amqpURL := getEnv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/") + dbURL := getEnv("DATABASE_URL", "") + token := os.Getenv("YANDEX_API_KEY") + model := os.Getenv("YANDEX_MODEL") + apiURL := getEnv("YANDEX_API_URL", "https://ai.api.cloud.yandex.net/v1/chat/completions") + inputQueue := getEnv("ANALYSE_QUEUE", "analyse") + finalQueue := getEnv("FINAL_QUEUE", "final") + + if token == "" { + slog.Error("YANDEX_API_KEY is required") + os.Exit(1) + } + if model == "" { + slog.Error("YANDEX_MODEL is required") + os.Exit(1) + } + if dbURL == "" { + slog.Error("DATABASE_URL is required") + os.Exit(1) + } + slog.Info("config loaded", "worker", "analyse", + "yandex_token", tokenFingerprint(token), "model", model, "api_url", apiURL) + + db := mustDB(dbURL) + defer db.Close() + + checkCtx, checkCancel := context.WithTimeout(context.Background(), 90*time.Second) + if err := checkYandexAPI(checkCtx, apiURL, model); err != nil { + checkCancel() + slog.Error("yandex api check failed — worker will not start", "worker", "analyse", "error", err) + os.Exit(1) + } + checkCancel() + + ch := mustRabbit(amqpURL) + + if _, err := ch.QueueDeclare(inputQueue, true, false, false, false, nil); err != nil { + slog.Error("declare queue failed", "queue", inputQueue, "error", err) + os.Exit(1) + } + if _, err := ch.QueueDeclare(finalQueue, true, false, false, false, nil); err != nil { + slog.Error("declare queue failed", "queue", finalQueue, "error", err) + os.Exit(1) + } + ch.Qos(1, 0, false) + + msgs, err := ch.Consume(inputQueue, "", false, false, false, false, nil) + if err != nil { + slog.Error("consume failed", "error", err) + os.Exit(1) + } + slog.Info("worker started", "worker", "analyse", "queue", inputQueue, "model", model) + + for d := range msgs { + taskStart := time.Now() + var task WorkerMessage + if err := json.Unmarshal(d.Body, &task); err != nil { + slog.Warn("bad message", "worker", "analyse", "delivery_tag", d.DeliveryTag, + "body_bytes", len(d.Body), "error", err) + d.Nack(false, false) + continue + } + + promptNames := make([]string, 0, len(task.Prompts)) + promptTextChars := 0 + for _, p := range task.Prompts { + if p.Name != "" { + promptNames = append(promptNames, p.Name) + promptTextChars += utf8.RuneCountInString(p.Prompt) + } + } + transcriptionChars := utf8.RuneCountInString(task.Transcription) + + slog.Info("message received", "worker", "analyse", + "task_id", task.TaskID, + "filename", task.Filename, + "delivery_tag", d.DeliveryTag, + "redelivered", d.Redelivered, + "body_bytes", len(d.Body), + "transcription_chars", transcriptionChars, + "segments", len(task.Segments), + "prompts", len(promptNames), + "prompt_names", promptNames, + "prompt_text_chars", promptTextChars, + "llm_calls_expected", len(promptNames), + ) + if d.Redelivered { + slog.Warn("redelivered message skipped — no llm call", + "worker", "analyse", "task_id", task.TaskID, + "delivery_tag", d.DeliveryTag, "prompts", len(promptNames)) + d.Nack(false, false) + continue + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + + result, stats, err := runAnalysis(ctx, apiURL, model, task.TaskID, task.Transcription, task.Prompts) + if err != nil { + cancel() + slog.Warn("task failed, discarded", + "worker", "analyse", "task_id", task.TaskID, + "llm_calls_done", stats.LLMCalls, + "total_tokens_so_far", stats.TotalTokens, + "error", err) + d.Nack(false, false) + continue + } + + analysisJSON, _ := json.Marshal(result) + complete, err := saveAnalysis(ctx, db, task, analysisJSON) + if err != nil { + cancel() + slog.Warn("db save failed, discarded", + "worker", "analyse", "task_id", task.TaskID, "error", err) + d.Nack(false, false) + continue + } + + taskAttrs := []any{ + "worker", "analyse", + "task_id", task.TaskID, + "llm_calls", stats.LLMCalls, + "total_tokens", stats.TotalTokens, + "prompt_tokens", stats.PromptTokens, + "completion_tokens", stats.OutputTokens, + "duration_ms", time.Since(taskStart).Milliseconds(), + } + + if complete { + notifyFinal(ctx, ch, db, finalQueue, task.TaskID, "analyse") + slog.Info("task complete", append(taskAttrs, "was_last", "analyse")...) + } else { + slog.Info("task partial", append(taskAttrs, "waiting_for", "tagging")...) + } + cancel() + + d.Ack(false) + } +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + "..." +} + +func loadFinalPayload(ctx context.Context, db *sql.DB, taskID string) ([]byte, error) { + var ( + filename, transcription, status sql.NullString + analysis, tagging, metadata []byte + createdAt, updatedAt time.Time + ) + err := db.QueryRowContext(ctx, ` + SELECT filename, transcription, analysis, tagging, metadata, status, created_at, updated_at + FROM results WHERE task_id = $1 + `, taskID).Scan(&filename, &transcription, &analysis, &tagging, &metadata, &status, &createdAt, &updatedAt) + if err != nil { + return nil, fmt.Errorf("load result: %w", err) + } + + msg := map[string]any{ + "task_id": taskID, + "status": status.String, + "created_at": createdAt, + "updated_at": updatedAt, + } + if filename.Valid { + msg["filename"] = filename.String + } + if transcription.Valid { + msg["transcription"] = transcription.String + } + if len(analysis) > 0 { + var v any + if err := json.Unmarshal(analysis, &v); err == nil { + msg["analysis"] = v + } + } + if len(tagging) > 0 { + var v any + if err := json.Unmarshal(tagging, &v); err == nil { + msg["tagging"] = v + } + } + if len(metadata) > 0 { + var meta map[string]any + if err := json.Unmarshal(metadata, &meta); err == nil { + for k, v := range meta { + msg[k] = v + } + } + } + return json.Marshal(msg) +} + +func notifyFinal(ctx context.Context, ch *amqp.Channel, db *sql.DB, queue, taskID, worker string) { + body, err := loadFinalPayload(ctx, db, taskID) + if err != nil { + slog.Warn("load final payload failed", "worker", worker, "task_id", taskID, "error", err) + return + } + if err := ch.PublishWithContext(ctx, "", queue, false, false, + amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + slog.Warn("publish final failed", "worker", worker, "task_id", taskID, "error", err) + return + } + slog.Info("published final", "worker", worker, "task_id", taskID, "queue", queue, "body_bytes", len(body)) + deleteProcessingFile(extractFilePath(body), taskID, worker) +} + +func extractFilePath(body []byte) string { + var msg map[string]any + if err := json.Unmarshal(body, &msg); err != nil { + return "" + } + fp, _ := msg["file_path"].(string) + return fp +} + +func deleteProcessingFile(filePath, taskID, worker string) { + if filePath == "" { + slog.Warn("processing file not deleted: no file_path", "worker", worker, "task_id", taskID) + return + } + if !strings.Contains(filePath, "/processing/") { + slog.Warn("processing file not deleted: path outside processing", "worker", worker, "task_id", taskID, "path", filePath) + return + } + if err := os.Remove(filePath); err != nil { + if os.IsNotExist(err) { + slog.Info("processing file already removed", "worker", worker, "task_id", taskID, "path", filePath) + return + } + slog.Warn("processing file delete failed", "worker", worker, "task_id", taskID, "path", filePath, "error", err) + return + } + slog.Info("processing file deleted", "worker", worker, "task_id", taskID, "path", filePath) +} + +func getEnv(k, d string) string { + if v := os.Getenv(k); v != "" { + return v + } + return d +} + +func tokenFingerprint(token string) string { + if len(token) <= 12 { + return "***" + } + return token[:8] + "..." + token[len(token)-4:] +} + +func mustDB(url string) *sql.DB { + db, err := sql.Open("pgx", url) + if err != nil { + slog.Error("db open failed", "error", err) + os.Exit(1) + } + db.SetMaxOpenConns(5) + time.Sleep(2 * time.Second) // дать Docker DNS зарегистрировать postgres + for i := 0; i < 60; i++ { + if err = db.Ping(); err == nil { + return db + } + if i < 5 || (i+1)%10 == 0 { + slog.Info("waiting for db", "attempt", i+1, "error", err) + } + time.Sleep(3 * time.Second) + } + slog.Error("db unreachable", "error", err) + os.Exit(1) + return nil +} + +func mustRabbit(url string) *amqp.Channel { + var conn *amqp.Connection + var err error + for i := 0; i < 30; i++ { + conn, err = amqp.Dial(url) + if err == nil { + break + } + slog.Info("waiting for rabbit", "attempt", i+1, "error", err) + time.Sleep(2 * time.Second) + } + if err != nil { + slog.Error("rabbit unreachable", "error", err) + os.Exit(1) + } + ch, err := conn.Channel() + if err != nil { + slog.Error("rabbit channel failed", "error", err) + os.Exit(1) + } + return ch +} diff --git a/workers/analyse/go.mod b/workers/analyse/go.mod new file mode 100644 index 0000000..9367c45 --- /dev/null +++ b/workers/analyse/go.mod @@ -0,0 +1,18 @@ +module github.com/yourorg/analyse + +go 1.22 + +require ( + github.com/jackc/pgx/v5 v5.5.5 + github.com/joho/godotenv v1.5.1 + github.com/rabbitmq/amqp091-go v1.9.0 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/text v0.14.0 // indirect +) diff --git a/workers/analyse/go.sum b/workers/analyse/go.sum new file mode 100644 index 0000000..a319316 --- /dev/null +++ b/workers/analyse/go.sum @@ -0,0 +1,41 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/workers/tagging/Dockerfile b/workers/tagging/Dockerfile new file mode 100644 index 0000000..60fe604 --- /dev/null +++ b/workers/tagging/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.22-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -o /tagging ./cmd/tagging + +FROM alpine:3.19 +RUN apk add --no-cache ca-certificates +COPY --from=build /tagging /tagging +ENTRYPOINT ["/tagging"] diff --git a/workers/tagging/cmd/tagging/main.go b/workers/tagging/cmd/tagging/main.go new file mode 100644 index 0000000..3c8fb65 --- /dev/null +++ b/workers/tagging/cmd/tagging/main.go @@ -0,0 +1,685 @@ +package main + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "fmt" + "io" + "log/slog" + "net" + "net/http" + "os" + "strings" + "time" + "unicode/utf8" + + "github.com/joho/godotenv" + _ "github.com/jackc/pgx/v5/stdlib" + amqp "github.com/rabbitmq/amqp091-go" +) + +func init() { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))) +} + +func apiURL() string { + if u := os.Getenv("YANDEX_API_URL"); u != "" { + return u + } + return "https://ai.api.cloud.yandex.net/v1/chat/completions" +} + +// ── входящее сообщение из очереди tagging ── +type WorkerMessage struct { + TaskID string `json:"task_id"` + Filename string `json:"filename"` + Transcription string `json:"transcription"` +} + +// ── результат классификации ── +type ClassificationResult struct { + L1 string `json:"L1"` + L2 string `json:"L2"` + L3 string `json:"L3"` + RiskLevel string `json:"risk_level"` + HasActionItems bool `json:"has_action_items"` + HasDeadline bool `json:"has_deadline"` +} + +// ── LLM request/response ── +type chatMessage struct { + Role string `json:"role"` + Content string `json:"content"` +} +type chatRequest struct { + Model string `json:"model"` + Temperature float64 `json:"temperature"` + ResponseFormat struct { + Type string `json:"type"` + } `json:"response_format"` + Messages []chatMessage `json:"messages"` +} +type tokenUsage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` +} +type chatResponse struct { + Choices []struct { + Message struct { + Content string `json:"content"` + } `json:"message"` + } `json:"choices"` + Usage *tokenUsage `json:"usage"` +} + +type llmCallResult struct { + Content string + RequestBytes int + ResponseBytes int + Usage *tokenUsage + Duration time.Duration +} + +// ===================== LLM ===================== + +var llmHTTPClient = newLLMHTTPClient(90 * time.Second) + +func newLLMHTTPClient(totalTimeout time.Duration) *http.Client { + return &http.Client{ + Timeout: totalTimeout, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSHandshakeTimeout: 60 * time.Second, + ResponseHeaderTimeout: 60 * time.Second, + ExpectContinueTimeout: 5 * time.Second, + IdleConnTimeout: 90 * time.Second, + }, + } +} + +func callLLM(ctx context.Context, model, prompt string) (*llmCallResult, error) { + const systemPrompt = "Ты — классификатор диалогов в логистике. Отвечай только JSON, без пояснений." + + reqBody := chatRequest{ + Model: model, + Temperature: 0.1, + Messages: []chatMessage{ + {Role: "system", Content: systemPrompt}, + {Role: "user", Content: prompt}, + }, + } + reqBody.ResponseFormat.Type = "json_object" + + jsonData, err := json.Marshal(reqBody) + if err != nil { + return nil, err + } + + req, err := http.NewRequestWithContext(ctx, "POST", apiURL(), bytes.NewBuffer(jsonData)) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+os.Getenv("YANDEX_API_KEY")) + req.Header.Set("Content-Type", "application/json") + + start := time.Now() + resp, err := llmHTTPClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + duration := time.Since(start) + + if resp.StatusCode != http.StatusOK { + return &llmCallResult{ + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Duration: duration, + }, fmt.Errorf("status %d: %s", resp.StatusCode, truncate(string(body), 500)) + } + + var result chatResponse + if err := json.Unmarshal(body, &result); err != nil { + return &llmCallResult{ + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Duration: duration, + }, err + } + if len(result.Choices) == 0 { + return &llmCallResult{ + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Duration: duration, + }, fmt.Errorf("empty response") + } + + return &llmCallResult{ + Content: result.Choices[0].Message.Content, + RequestBytes: len(jsonData), + ResponseBytes: len(body), + Usage: result.Usage, + Duration: duration, + }, nil +} + +func checkYandexAPI(ctx context.Context, model string) error { + slog.Info("yandex api check started", "worker", "tagging", "url", apiURL(), "model", model) + + res, err := callLLM(ctx, model, `Ответь только JSON: {"ok":true}`) + if err != nil { + return err + } + + attrs := []any{ + "worker", "tagging", + "duration_ms", res.Duration.Milliseconds(), + "response_chars", utf8.RuneCountInString(res.Content), + } + if res.Usage != nil { + attrs = append(attrs, + "prompt_tokens", res.Usage.PromptTokens, + "completion_tokens", res.Usage.CompletionTokens, + "total_tokens", res.Usage.TotalTokens, + ) + } + slog.Info("yandex api check ok", attrs...) + return nil +} + +func logLLMCall(worker, taskID, model, callType string, attempt int, inputChars int, res *llmCallResult, err error) { + attrs := []any{ + "worker", worker, + "task_id", taskID, + "model", model, + "call_type", callType, + "attempt", attempt, + "input_chars", inputChars, + } + if res != nil { + attrs = append(attrs, + "duration_ms", res.Duration.Milliseconds(), + "request_bytes", res.RequestBytes, + "response_bytes", res.ResponseBytes, + "response_chars", utf8.RuneCountInString(res.Content), + ) + if res.Usage != nil { + attrs = append(attrs, + "prompt_tokens", res.Usage.PromptTokens, + "completion_tokens", res.Usage.CompletionTokens, + "total_tokens", res.Usage.TotalTokens, + ) + } + } + if err != nil { + slog.Warn("llm call failed", append(attrs, "error", err)...) + return + } + slog.Info("llm call ok", attrs...) +} + +func buildPrompt(text string) string { + return fmt.Sprintf(`Ты — классификатор диалогов в логистике. + +Тебе даётся НЕструктурированный текст диалога (разговор, звонок, переписка). +Текст может быть неаккуратным, с ошибками, без структуры. + +Твоя задача: +1. Понять смысл диалога +2. Выделить ключевую цель разговора +3. Определить наличие проблемы +4. Классифицировать диалог по правилам ниже + +=== ИЕРАРХИЯ КЛАССОВ === + +L1: +- new_order +- order_change +- tracking +- delivery_coordination +- problem +- claim +- information_request +- internal_communication +- other + +L2: + +Для problem: +- delivery_issue +- cargo_issue +- data_issue +- communication_issue + +Для delivery_coordination: +- delivery_time +- unloading_conditions +- warehouse_rules +- access +- scheduling + +Для tracking: +- location_request +- status_update +- eta + +L3 (опционально): +- wrong_contact +- wrong_address +- missing_info +- delay +- lost +- damage +- cannot_reach +- no_response + +=== ДОПОЛНИТЕЛЬНЫЕ ПОЛЯ === + +risk_level: +- none +- low +- medium +- high + +has_action_items: +- true / false + +has_deadline: +- true / false + +=== ПРАВИЛА === + +1. Определи основную цель разговора: + - заказ → new_order + - изменение → order_change + - узнать статус → tracking + - согласование → delivery_coordination + - ошибка / проблема → problem + +2. Если есть любая ошибка или сбой → ВСЕГДА L1 = problem + +3. Ошибки в email / телефоне / адресе → L2 = data_issue + +4. Если обсуждают условия (время, склад, разгрузка) без проблемы → delivery_coordination + +5. Если спрашивают "где груз?" → tracking + +6. Определи risk_level: + - low → проблема не влияет на доставку + - medium → возможна задержка + - high → срыв сроков / потеря + +7. has_action_items = true если: + - есть договорённости ("перезвоню", "свяжется", "отправлю") + +8. has_deadline = true если: + - есть конкретное время ("в 18:00", "через 10 минут", "завтра") + +--- + +=== ФОРМАТ ОТВЕТА === + +Ответ только JSON, без пояснений: + +{ + "L1": "...", + "L2": "...", + "L3": "...", + "risk_level": "...", + "has_action_items": true/false, + "has_deadline": true/false +} + +--- + +=== ДИАЛОГ === + +Текст: +""" +%s +"""`, text) +} + +func classify(ctx context.Context, taskID, model, text string) (ClassificationResult, error) { + prompt := buildPrompt(text) + inputChars := utf8.RuneCountInString(prompt) + + res, err := callLLM(ctx, model, prompt) + logLLMCall("tagging", taskID, model, "classify", 1, inputChars, res, err) + if err != nil { + return ClassificationResult{}, err + } + + var result ClassificationResult + if err := json.Unmarshal([]byte(res.Content), &result); err != nil { + return ClassificationResult{}, fmt.Errorf("parse: %w, resp: %s", err, truncate(res.Content, 300)) + } + return result, nil +} + +// ===================== DB ===================== + +func saveTagging(ctx context.Context, db *sql.DB, taskID, filename, transcription string, tagging []byte) (complete bool, err error) { + _, err = db.ExecContext(ctx, + `INSERT INTO results (task_id) VALUES ($1) ON CONFLICT (task_id) DO NOTHING`, taskID) + if err != nil { + return false, fmt.Errorf("ensure row: %w", err) + } + + err = db.QueryRowContext(ctx, ` + UPDATE results + SET tagging = $2::jsonb, + filename = COALESCE(NULLIF($3, ''), filename), + transcription = COALESCE(NULLIF($4, ''), transcription), + updated_at = now(), + status = CASE WHEN analysis IS NOT NULL THEN 'done' ELSE status END + WHERE task_id = $1 + RETURNING (analysis IS NOT NULL AND tagging IS NOT NULL) + `, taskID, string(tagging), filename, transcription).Scan(&complete) + if err != nil { + return false, fmt.Errorf("update tagging: %w", err) + } + return complete, nil +} + +// ===================== MAIN ===================== + +func loadDotenv() { + path := os.Getenv("DOTENV_PATH") + if path == "" { + return + } + if err := godotenv.Overload(path); err != nil { + slog.Warn("dotenv load failed", "path", path, "error", err) + return + } + slog.Info("dotenv loaded", "path", path) +} + +func main() { + loadDotenv() + + amqpURL := getenv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/") + dbURL := os.Getenv("DATABASE_URL") + token := os.Getenv("YANDEX_API_KEY") + model := os.Getenv("YANDEX_MODEL") + inputQueue := getenv("TAGGING_QUEUE", "tagging") + finalQueue := getenv("FINAL_QUEUE", "final") + + if token == "" { + slog.Error("YANDEX_API_KEY is required") + os.Exit(1) + } + if model == "" { + slog.Error("YANDEX_MODEL is required") + os.Exit(1) + } + slog.Info("config loaded", "worker", "tagging", + "yandex_token", tokenFingerprint(token), "model", model, "api_url", apiURL()) + + db := mustDB(dbURL) + defer db.Close() + + checkCtx, checkCancel := context.WithTimeout(context.Background(), 90*time.Second) + if err := checkYandexAPI(checkCtx, model); err != nil { + checkCancel() + slog.Error("yandex api check failed — worker will not start", "worker", "tagging", "error", err) + os.Exit(1) + } + checkCancel() + + ch := mustRabbit(amqpURL) + + if _, err := ch.QueueDeclare(inputQueue, true, false, false, false, nil); err != nil { + slog.Error("declare queue failed", "queue", inputQueue, "error", err) + os.Exit(1) + } + if _, err := ch.QueueDeclare(finalQueue, true, false, false, false, nil); err != nil { + slog.Error("declare queue failed", "queue", finalQueue, "error", err) + os.Exit(1) + } + ch.Qos(1, 0, false) + + msgs, err := ch.Consume(inputQueue, "", false, false, false, false, nil) + if err != nil { + slog.Error("consume failed", "error", err) + os.Exit(1) + } + slog.Info("worker started", "worker", "tagging", "queue", inputQueue, "model", model) + + for d := range msgs { + taskStart := time.Now() + var task WorkerMessage + if err := json.Unmarshal(d.Body, &task); err != nil { + slog.Warn("bad message", "worker", "tagging", "delivery_tag", d.DeliveryTag, + "body_bytes", len(d.Body), "error", err) + d.Nack(false, false) + continue + } + + transcriptionChars := utf8.RuneCountInString(task.Transcription) + slog.Info("message received", "worker", "tagging", + "task_id", task.TaskID, + "filename", task.Filename, + "delivery_tag", d.DeliveryTag, + "redelivered", d.Redelivered, + "body_bytes", len(d.Body), + "transcription_chars", transcriptionChars, + "llm_calls_expected", 1, + ) + if d.Redelivered { + slog.Warn("redelivered message skipped — no llm call", + "worker", "tagging", "task_id", task.TaskID, "delivery_tag", d.DeliveryTag) + d.Nack(false, false) + continue + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + + result, err := classify(ctx, task.TaskID, model, task.Transcription) + if err != nil { + cancel() + slog.Warn("task failed, discarded", + "worker", "tagging", "task_id", task.TaskID, + "llm_calls", 1, "error", err) + d.Nack(false, false) + continue + } + + tagJSON, _ := json.Marshal(result) + complete, err := saveTagging(ctx, db, task.TaskID, task.Filename, task.Transcription, tagJSON) + if err != nil { + cancel() + slog.Warn("db save failed, discarded", + "worker", "tagging", "task_id", task.TaskID, "error", err) + d.Nack(false, false) + continue + } + + if complete { + notifyFinal(ctx, ch, db, finalQueue, task.TaskID, "tagging") + slog.Info("task complete", "worker", "tagging", "task_id", task.TaskID, + "was_last", "tagging", "L1", result.L1, + "llm_calls", 1, "duration_ms", time.Since(taskStart).Milliseconds()) + } else { + slog.Info("task partial", "worker", "tagging", "task_id", task.TaskID, + "waiting_for", "analyse", "L1", result.L1, + "llm_calls", 1, "duration_ms", time.Since(taskStart).Milliseconds()) + } + cancel() + + d.Ack(false) + } +} + +func getenv(k, d string) string { + if v := os.Getenv(k); v != "" { + return v + } + return d +} + +func tokenFingerprint(token string) string { + if len(token) <= 12 { + return "***" + } + return token[:8] + "..." + token[len(token)-4:] +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + "..." +} + +func loadFinalPayload(ctx context.Context, db *sql.DB, taskID string) ([]byte, error) { + var ( + filename, transcription, status sql.NullString + analysis, tagging, metadata []byte + createdAt, updatedAt time.Time + ) + err := db.QueryRowContext(ctx, ` + SELECT filename, transcription, analysis, tagging, metadata, status, created_at, updated_at + FROM results WHERE task_id = $1 + `, taskID).Scan(&filename, &transcription, &analysis, &tagging, &metadata, &status, &createdAt, &updatedAt) + if err != nil { + return nil, fmt.Errorf("load result: %w", err) + } + + msg := map[string]any{ + "task_id": taskID, + "status": status.String, + "created_at": createdAt, + "updated_at": updatedAt, + } + if filename.Valid { + msg["filename"] = filename.String + } + if transcription.Valid { + msg["transcription"] = transcription.String + } + if len(analysis) > 0 { + var v any + if err := json.Unmarshal(analysis, &v); err == nil { + msg["analysis"] = v + } + } + if len(tagging) > 0 { + var v any + if err := json.Unmarshal(tagging, &v); err == nil { + msg["tagging"] = v + } + } + if len(metadata) > 0 { + var meta map[string]any + if err := json.Unmarshal(metadata, &meta); err == nil { + for k, v := range meta { + msg[k] = v + } + } + } + return json.Marshal(msg) +} + +func notifyFinal(ctx context.Context, ch *amqp.Channel, db *sql.DB, queue, taskID, worker string) { + body, err := loadFinalPayload(ctx, db, taskID) + if err != nil { + slog.Warn("load final payload failed", "worker", worker, "task_id", taskID, "error", err) + return + } + if err := ch.PublishWithContext(ctx, "", queue, false, false, + amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + slog.Warn("publish final failed", "worker", worker, "task_id", taskID, "error", err) + return + } + slog.Info("published final", "worker", worker, "task_id", taskID, "queue", queue, "body_bytes", len(body)) + deleteProcessingFile(extractFilePath(body), taskID, worker) +} + +func extractFilePath(body []byte) string { + var msg map[string]any + if err := json.Unmarshal(body, &msg); err != nil { + return "" + } + fp, _ := msg["file_path"].(string) + return fp +} + +func deleteProcessingFile(filePath, taskID, worker string) { + if filePath == "" { + slog.Warn("processing file not deleted: no file_path", "worker", worker, "task_id", taskID) + return + } + if !strings.Contains(filePath, "/processing/") { + slog.Warn("processing file not deleted: path outside processing", "worker", worker, "task_id", taskID, "path", filePath) + return + } + if err := os.Remove(filePath); err != nil { + if os.IsNotExist(err) { + slog.Info("processing file already removed", "worker", worker, "task_id", taskID, "path", filePath) + return + } + slog.Warn("processing file delete failed", "worker", worker, "task_id", taskID, "path", filePath, "error", err) + return + } + slog.Info("processing file deleted", "worker", worker, "task_id", taskID, "path", filePath) +} + +func mustDB(url string) *sql.DB { + db, err := sql.Open("pgx", url) + if err != nil { + slog.Error("db open failed", "error", err) + os.Exit(1) + } + db.SetMaxOpenConns(5) + time.Sleep(2 * time.Second) // дать Docker DNS зарегистрировать postgres + for i := 0; i < 60; i++ { + if err = db.Ping(); err == nil { + return db + } + if i < 5 || (i+1)%10 == 0 { + slog.Info("waiting for db", "attempt", i+1, "error", err) + } + time.Sleep(3 * time.Second) + } + slog.Error("db unreachable", "error", err) + os.Exit(1) + return nil +} + +func mustRabbit(url string) *amqp.Channel { + var conn *amqp.Connection + var err error + for i := 0; i < 30; i++ { + conn, err = amqp.Dial(url) + if err == nil { + break + } + slog.Info("waiting for rabbit", "attempt", i+1, "error", err) + time.Sleep(2 * time.Second) + } + if err != nil { + slog.Error("rabbit unreachable", "error", err) + os.Exit(1) + } + ch, err := conn.Channel() + if err != nil { + slog.Error("rabbit channel failed", "error", err) + os.Exit(1) + } + return ch +} diff --git a/workers/tagging/go.mod b/workers/tagging/go.mod new file mode 100644 index 0000000..ffb45e6 --- /dev/null +++ b/workers/tagging/go.mod @@ -0,0 +1,18 @@ +module github.com/yourorg/tagging + +go 1.22 + +require ( + github.com/jackc/pgx/v5 v5.5.5 + github.com/joho/godotenv v1.5.1 + github.com/rabbitmq/amqp091-go v1.9.0 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/text v0.14.0 // indirect +) diff --git a/workers/tagging/go.sum b/workers/tagging/go.sum new file mode 100644 index 0000000..a319316 --- /dev/null +++ b/workers/tagging/go.sum @@ -0,0 +1,41 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/workers/transcribe/Dockerfile b/workers/transcribe/Dockerfile new file mode 100644 index 0000000..a874b90 --- /dev/null +++ b/workers/transcribe/Dockerfile @@ -0,0 +1,11 @@ +FROM golang:1.22-alpine AS build +WORKDIR /src +COPY go.mod go.sum* ./ +RUN go mod download +COPY . . +RUN CGO_ENABLED=0 go build -o /transcribe ./cmd/transcribe + +FROM alpine:3.19 +RUN apk add --no-cache ca-certificates +COPY --from=build /transcribe /transcribe +ENTRYPOINT ["/transcribe"] diff --git a/workers/transcribe/cmd/transcribe/main.go b/workers/transcribe/cmd/transcribe/main.go new file mode 100644 index 0000000..9c5decd --- /dev/null +++ b/workers/transcribe/cmd/transcribe/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/yourorg/transcribe/internal/config" + "github.com/yourorg/transcribe/internal/consumer" +) + +func main() { + slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))) + + cfg := config.Load() + if cfg.NexaraAPIKey == "" { + slog.Error("NEXARA_API_KEY is required") + os.Exit(1) + } + + ch := mustRabbit(cfg.RabbitURL) + cons, err := consumer.New(cfg, ch) + if err != nil { + slog.Error("consumer init failed", "error", err) + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + if err := cons.Run(ctx); err != nil && ctx.Err() == nil { + slog.Error("consumer stopped", "error", err) + os.Exit(1) + } + slog.Info("transcribe worker stopping") +} + +func mustRabbit(url string) *amqp.Channel { + var conn *amqp.Connection + var err error + for i := 0; i < 30; i++ { + conn, err = amqp.Dial(url) + if err == nil { + break + } + slog.Info("waiting for rabbit", "attempt", i+1, "error", err) + time.Sleep(2 * time.Second) + } + if err != nil { + slog.Error("rabbit unreachable", "error", err) + os.Exit(1) + } + ch, err := conn.Channel() + if err != nil { + slog.Error("rabbit channel failed", "error", err) + os.Exit(1) + } + return ch +} diff --git a/workers/transcribe/configs/prompts.json b/workers/transcribe/configs/prompts.json new file mode 100644 index 0000000..2a23007 --- /dev/null +++ b/workers/transcribe/configs/prompts.json @@ -0,0 +1,23 @@ +[ + { + "id": 1, + "id_section": 1, + "name": "behavioral", + "prompt": "Ты — строгий классификатор звонков.\n\nЗадача:\nПроанализируй диалог и оцени поведенческие критерии.\n\nКритерии:\n1. Приветствие\n2. Инициативность (выявление цели, попытка развить разговор)\n3. Уточнил, остались ли вопросы\n4. Прощание\n\nИнструкция:\nДля каждого критерия:\n- определи наличие\n- найди ДОСЛОВНУЮ цитату\n- оцени confidence (0.0–1.0)\n\nФормат ответа (строго JSON):\n\n{\n \"greeting\": {\n \"value\": true/false,\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"initiative\": {\n \"value\": true/false,\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"questions_check\": {\n \"value\": true/false,\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"closing\": {\n \"value\": true/false,\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n }\n}\n\nЖЁСТКИЕ ПРАВИЛА:\n- каждый критерий оценивается независимо\n- не додумывать\n- если нет → value=false, evidence=null, confidence=0.0\n- evidence должен подтверждать вывод", + "dt_create": "2026-06-09T09:00:00.000000" + }, + { + "id": 2, + "id_section": 1, + "name": "client_data", + "prompt": "Ты — строгий классификатор звонков.\n\nЗадача:\nОпредели, какие данные о клиенте были получены.\n\nКритерии:\n1. Первый ли раз обращается\n2. Указан ли город клиента\n3. Тип клиента (физ/юр)\n4. Получены ли контакты\n5. Источник (откуда узнали)\n\nФормат ответа (строго JSON):\n\n{\n \"first_time\": {\n \"value\": true/false,\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"client_city\": {\n \"value\": true/false,\n \"city\": \"строка или null\",\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"client_type\": {\n \"value\": true/false,\n \"type\": \"physical|legal|null\",\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"contacts\": {\n \"value\": true/false,\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"source\": {\n \"value\": true/false,\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n }\n}\n\nЖЁСТКИЕ ПРАВИЛА:\n- city/type только если явно сказано\n- не додумывать\n- если нет → value=false, evidence=null, confidence=0.0", + "dt_create": "2026-06-09T09:00:00.000000" + }, + { + "id": 3, + "id_section": 1, + "name": "cargo_data", + "prompt": "Ты — строгий классификатор логистических данных.\n\nКритерии:\n1. Характер груза\n2. Параметры груза (вес, объем, размеры)\n3. Стоимость груза\n\nФормат ответа (строго JSON):\n\n{\n \"cargo_type\": {\n \"value\": true/false,\n \"type\": \"строка или null\",\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"cargo_params\": {\n \"value\": true/false,\n \"params\": \"строка или null\",\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n },\n \"cargo_value\": {\n \"value\": true/false,\n \"amount\": \"строка или null\",\n \"evidence\": \"цитата или null\",\n \"confidence\": number\n }\n}\n\nЖЁСТКИЕ ПРАВИЛА:\n- только явные данные\n- числа/параметры должны быть в evidence\n- если нет → value=false, evidence=null, confidence=0.0", + "dt_create": "2026-06-09T09:00:00.000000" + } +] diff --git a/workers/transcribe/go.mod b/workers/transcribe/go.mod new file mode 100644 index 0000000..c5d54a6 --- /dev/null +++ b/workers/transcribe/go.mod @@ -0,0 +1,5 @@ +module github.com/yourorg/transcribe + +go 1.22 + +require github.com/rabbitmq/amqp091-go v1.9.0 diff --git a/workers/transcribe/go.sum b/workers/transcribe/go.sum new file mode 100644 index 0000000..ab0418f --- /dev/null +++ b/workers/transcribe/go.sum @@ -0,0 +1,18 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= +github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/workers/transcribe/internal/config/config.go b/workers/transcribe/internal/config/config.go new file mode 100644 index 0000000..87a3096 --- /dev/null +++ b/workers/transcribe/internal/config/config.go @@ -0,0 +1,78 @@ +package config + +import ( + "os" + "strconv" + "time" +) + +type Config struct { + RabbitURL string + InputQueue string + OutputExchange string + AnalyseQueue string + TaggingQueue string + InputExchange string + InputRoutingKey string + Prefetch int + + NexaraBaseURL string + NexaraAPIKey string + NexaraModel string + NexaraTimeout time.Duration + + PromptsSource string + PromptsFile string + PromptsBaseURL string + PromptsAPIKey string + PromptsSection int +} + +func Load() Config { + return Config{ + RabbitURL: getEnv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672/"), + InputQueue: getEnv("INPUT_QUEUE", "transcribe.tasks"), + OutputExchange: getEnv("OUTPUT_EXCHANGE", "transcription_done"), + AnalyseQueue: getEnv("ANALYSE_QUEUE", "analyse"), + TaggingQueue: getEnv("TAGGING_QUEUE", "tagging"), + InputExchange: getEnv("RABBITMQ_EXCHANGE", "audio_pipeline"), + InputRoutingKey: getEnv("RABBITMQ_ROUTING_KEY", "audio.new"), + Prefetch: getInt("PREFETCH", 1), + + NexaraBaseURL: getEnv("NEXARA_BASE_URL", "https://api.nexara.ru"), + NexaraAPIKey: os.Getenv("NEXARA_API_KEY"), + NexaraModel: getEnv("NEXARA_MODEL", "whisper-1"), + NexaraTimeout: getDuration("NEXARA_TIMEOUT", 10*time.Minute), + + PromptsSource: getEnv("PROMPTS_SOURCE", "static"), + PromptsFile: getEnv("PROMPTS_FILE", "/app/configs/prompts.json"), + PromptsBaseURL: os.Getenv("PROMPTS_BASE_URL"), + PromptsAPIKey: os.Getenv("PROMPTS_API_KEY"), + PromptsSection: getInt("PROMPTS_SECTION", 1), + } +} + +func getEnv(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func getInt(key string, def int) int { + if v := os.Getenv(key); v != "" { + if i, err := strconv.Atoi(v); err == nil { + return i + } + } + return def +} + +func getDuration(key string, def time.Duration) time.Duration { + if v := os.Getenv(key); v != "" { + if d, err := time.ParseDuration(v); err == nil { + return d + } + } + return def +} diff --git a/workers/transcribe/internal/consumer/consumer.go b/workers/transcribe/internal/consumer/consumer.go new file mode 100644 index 0000000..ea0c55a --- /dev/null +++ b/workers/transcribe/internal/consumer/consumer.go @@ -0,0 +1,172 @@ +package consumer + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + + "github.com/yourorg/transcribe/internal/config" + "github.com/yourorg/transcribe/internal/models" + "github.com/yourorg/transcribe/internal/nexara" + "github.com/yourorg/transcribe/internal/prompts" +) + +type Consumer struct { + cfg config.Config + ch *amqp.Channel + nexara *nexara.Client + prompts *prompts.Loader +} + +func New(cfg config.Config, ch *amqp.Channel) (*Consumer, error) { + if err := setupTopology(ch, cfg); err != nil { + return nil, err + } + return &Consumer{ + cfg: cfg, + ch: ch, + nexara: nexara.New(cfg.NexaraBaseURL, cfg.NexaraAPIKey, cfg.NexaraModel, cfg.NexaraTimeout), + prompts: prompts.New(cfg.PromptsSource, cfg.PromptsFile, cfg.PromptsBaseURL, cfg.PromptsAPIKey, cfg.PromptsSection), + }, nil +} + +func setupTopology(ch *amqp.Channel, cfg config.Config) error { + if err := ch.ExchangeDeclare("dlx", "direct", true, false, false, false, nil); err != nil { + return fmt.Errorf("declare dlx: %w", err) + } + if err := ch.ExchangeDeclare(cfg.InputExchange, "direct", true, false, false, false, nil); err != nil { + return fmt.Errorf("declare input exchange: %w", err) + } + if err := ch.ExchangeDeclare(cfg.OutputExchange, "fanout", true, false, false, false, nil); err != nil { + return fmt.Errorf("declare output exchange: %w", err) + } + + dlqArgs := amqp.Table{ + "x-dead-letter-exchange": "dlx", + "x-dead-letter-routing-key": cfg.InputQueue + ".failed", + } + if _, err := ch.QueueDeclare(cfg.InputQueue, true, false, false, false, dlqArgs); err != nil { + return fmt.Errorf("declare input queue: %w", err) + } + if _, err := ch.QueueDeclare(cfg.InputQueue+".failed", true, false, false, false, nil); err != nil { + return fmt.Errorf("declare dlq: %w", err) + } + if err := ch.QueueBind(cfg.InputQueue+".failed", cfg.InputQueue+".failed", "dlx", false, nil); err != nil { + return fmt.Errorf("bind dlq: %w", err) + } + if err := ch.QueueBind(cfg.InputQueue, cfg.InputRoutingKey, cfg.InputExchange, false, nil); err != nil { + return fmt.Errorf("bind input queue: %w", err) + } + + for _, q := range []string{cfg.AnalyseQueue, cfg.TaggingQueue} { + if _, err := ch.QueueDeclare(q, true, false, false, false, nil); err != nil { + return fmt.Errorf("declare queue %s: %w", q, err) + } + if err := ch.QueueBind(q, "", cfg.OutputExchange, false, nil); err != nil { + return fmt.Errorf("bind queue %s: %w", q, err) + } + } + + return ch.Qos(cfg.Prefetch, 0, false) +} + +func (c *Consumer) Run(ctx context.Context) error { + if err := c.ch.Confirm(false); err != nil { + return fmt.Errorf("confirm mode: %w", err) + } + + msgs, err := c.ch.Consume(c.cfg.InputQueue, "", false, false, false, false, nil) + if err != nil { + return err + } + + slog.Info("transcribe worker started", "queue", c.cfg.InputQueue, "output_exchange", c.cfg.OutputExchange) + + for { + select { + case <-ctx.Done(): + return nil + case d, ok := <-msgs: + if !ok { + return fmt.Errorf("delivery channel closed") + } + c.handle(ctx, d) + } + } +} + +func (c *Consumer) handle(ctx context.Context, d amqp.Delivery) { + var task models.AudioTask + if err := json.Unmarshal(d.Body, &task); err != nil { + slog.Warn("bad message", "delivery_tag", d.DeliveryTag, "error", err) + _ = d.Nack(false, false) + return + } + + slog.Info("message received", "task_id", task.TaskID, "file_path", task.FilePath, "filename", task.Filename) + + txCtx, cancel := context.WithTimeout(ctx, c.cfg.NexaraTimeout+30*time.Second) + defer cancel() + + text, lang, segments, err := c.nexara.TranscribeFile(txCtx, task.FilePath) + if err != nil { + slog.Warn("transcription failed", "task_id", task.TaskID, "error", err) + _ = d.Nack(false, false) + return + } + + promptList, err := c.prompts.Load(txCtx) + if err != nil { + slog.Warn("prompts load failed", "task_id", task.TaskID, "error", err) + _ = d.Nack(false, false) + return + } + + result := models.TranscriptionResult{ + TaskID: task.TaskID, + Filename: task.Filename, + FilePath: task.FilePath, + Transcription: text, + Language: lang, + Segments: segments, + Prompts: promptList, + TranscribedAt: time.Now().Unix(), + } + + body, err := json.Marshal(result) + if err != nil { + slog.Warn("marshal failed", "task_id", task.TaskID, "error", err) + _ = d.Nack(false, false) + return + } + + confirms := c.ch.NotifyPublish(make(chan amqp.Confirmation, 1)) + if err := c.ch.PublishWithContext(txCtx, c.cfg.OutputExchange, "", false, false, amqp.Publishing{ + ContentType: "application/json", + Body: body, + DeliveryMode: amqp.Persistent, + }); err != nil { + slog.Warn("publish failed, requeue", "task_id", task.TaskID, "error", err) + _ = d.Nack(false, true) + return + } + select { + case confirm := <-confirms: + if !confirm.Ack { + slog.Warn("publish not confirmed, requeue", "task_id", task.TaskID) + _ = d.Nack(false, true) + return + } + case <-txCtx.Done(): + slog.Warn("publish timeout, requeue", "task_id", task.TaskID) + _ = d.Nack(false, true) + return + } + + slog.Info("transcribed", "task_id", task.TaskID, "language", lang, "chars", len(text), "segments", len(segments), "prompts", len(promptList)) + _ = d.Ack(false) +} diff --git a/workers/transcribe/internal/models/models.go b/workers/transcribe/internal/models/models.go new file mode 100644 index 0000000..4d59890 --- /dev/null +++ b/workers/transcribe/internal/models/models.go @@ -0,0 +1,34 @@ +package models + +type AudioTask struct { + TaskID string `json:"task_id"` + FilePath string `json:"file_path"` + Filename string `json:"filename"` + Size int64 `json:"size"` + CreatedAt int64 `json:"created_at"` +} + +type Segment struct { + Start float64 `json:"start"` + End float64 `json:"end"` + Text string `json:"text"` +} + +type Prompt struct { + ID int `json:"id"` + IDSection int `json:"id_section"` + Name string `json:"name"` + Prompt string `json:"prompt"` + DtCreate string `json:"dt_create"` +} + +type TranscriptionResult struct { + TaskID string `json:"task_id"` + Filename string `json:"filename"` + FilePath string `json:"file_path"` + Transcription string `json:"transcription"` + Language string `json:"language"` + Segments []Segment `json:"segments,omitempty"` + Prompts []Prompt `json:"prompts"` + TranscribedAt int64 `json:"transcribed_at"` +} diff --git a/workers/transcribe/internal/nexara/nexara.go b/workers/transcribe/internal/nexara/nexara.go new file mode 100644 index 0000000..66afc4f --- /dev/null +++ b/workers/transcribe/internal/nexara/nexara.go @@ -0,0 +1,117 @@ +package nexara + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/yourorg/transcribe/internal/models" +) + +type Client struct { + apiURL string + apiKey string + model string + httpClient *http.Client +} + +func New(baseURL, apiKey, model string, timeout time.Duration) *Client { + baseURL = strings.TrimRight(baseURL, "/") + return &Client{ + apiURL: baseURL + "/api/v1/audio/transcriptions", + apiKey: apiKey, + model: model, + httpClient: &http.Client{ + Timeout: timeout, + }, + } +} + +func (c *Client) TranscribeFile(ctx context.Context, path string) (text, language string, segments []models.Segment, err error) { + f, err := os.Open(path) + if err != nil { + return "", "", nil, fmt.Errorf("open file: %w", err) + } + defer f.Close() + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile("file", filepath.Base(path)) + if err != nil { + return "", "", nil, err + } + if _, err := io.Copy(part, f); err != nil { + return "", "", nil, err + } + if c.model != "" { + if err := writer.WriteField("model", c.model); err != nil { + return "", "", nil, err + } + } + if err := writer.WriteField("response_format", "json"); err != nil { + return "", "", nil, err + } + if err := writer.Close(); err != nil { + return "", "", nil, err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.apiURL, body) + if err != nil { + return "", "", nil, err + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + req.Header.Set("Authorization", "Bearer "+c.apiKey) + + resp, err := c.httpClient.Do(req) + if err != nil { + return "", "", nil, fmt.Errorf("request: %w", err) + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return "", "", nil, err + } + if resp.StatusCode != http.StatusOK { + return "", "", nil, fmt.Errorf("status %d: %s", resp.StatusCode, string(respBody)) + } + + var raw map[string]any + if err := json.Unmarshal(respBody, &raw); err != nil { + return "", "", nil, fmt.Errorf("parse: %w", err) + } + if t, ok := raw["text"].(string); ok { + text = t + } + if lang, ok := raw["language"].(string); ok { + language = lang + } + if segs, ok := raw["segments"].([]any); ok { + for _, s := range segs { + m, ok := s.(map[string]any) + if !ok { + continue + } + var seg models.Segment + if v, ok := m["start"].(float64); ok { + seg.Start = v + } + if v, ok := m["end"].(float64); ok { + seg.End = v + } + if v, ok := m["text"].(string); ok { + seg.Text = v + } + segments = append(segments, seg) + } + } + return text, language, segments, nil +} diff --git a/workers/transcribe/internal/prompts/prompts.go b/workers/transcribe/internal/prompts/prompts.go new file mode 100644 index 0000000..9b8cbc1 --- /dev/null +++ b/workers/transcribe/internal/prompts/prompts.go @@ -0,0 +1,100 @@ +package prompts + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strconv" + "strings" + "time" + + "github.com/yourorg/transcribe/internal/models" +) + +type Loader struct { + source string + filePath string + baseURL string + apiKey string + sectionID int + client *http.Client +} + +func New(source, filePath, baseURL, apiKey string, sectionID int) *Loader { + return &Loader{ + source: source, + filePath: filePath, + baseURL: strings.TrimRight(baseURL, "/"), + apiKey: apiKey, + sectionID: sectionID, + client: &http.Client{Timeout: 30 * time.Second}, + } +} + +func (l *Loader) Load(ctx context.Context) ([]models.Prompt, error) { + switch strings.ToLower(l.source) { + case "http": + return l.loadHTTP(ctx) + default: + return l.loadStatic() + } +} + +func (l *Loader) loadStatic() ([]models.Prompt, error) { + data, err := os.ReadFile(l.filePath) + if err != nil { + return nil, fmt.Errorf("read prompts file: %w", err) + } + var prompts []models.Prompt + if err := json.Unmarshal(data, &prompts); err != nil { + return nil, fmt.Errorf("parse prompts file: %w", err) + } + return filterSection(prompts, l.sectionID), nil +} + +func (l *Loader) loadHTTP(ctx context.Context) ([]models.Prompt, error) { + if l.baseURL == "" { + return nil, fmt.Errorf("PROMPTS_BASE_URL is required for http source") + } + url := fmt.Sprintf("%s/metrics/?id_section=%s", l.baseURL, strconv.Itoa(l.sectionID)) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + if l.apiKey != "" { + req.Header.Set("Authorization", "Bearer "+l.apiKey) + } + resp, err := l.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("prompts api status %d: %s", resp.StatusCode, string(body)) + } + var prompts []models.Prompt + if err := json.Unmarshal(body, &prompts); err != nil { + return nil, fmt.Errorf("parse prompts response: %w", err) + } + return filterSection(prompts, l.sectionID), nil +} + +func filterSection(prompts []models.Prompt, sectionID int) []models.Prompt { + if sectionID <= 0 { + return prompts + } + out := make([]models.Prompt, 0, len(prompts)) + for _, p := range prompts { + if p.IDSection == sectionID { + out = append(out, p) + } + } + return out +}