Files

59 lines
1.3 KiB
Go
Raw Permalink Normal View History

2026-06-10 17:12:58 +03:00
package publisher
import (
"context"
"encoding/json"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type AudioTask struct {
TaskID string `json:"task_id"`
FilePath string `json:"file_path"`
Filename string `json:"filename"`
Size int64 `json:"size"`
CreatedAt int64 `json:"created_at"`
}
type Publisher struct {
ch *amqp.Channel
exchange string
routingKey string
}
func New(ch *amqp.Channel, exchange, routingKey string) (*Publisher, error) {
if err := ch.Confirm(false); err != nil {
return nil, fmt.Errorf("confirm mode: %w", err)
}
return &Publisher{ch: ch, exchange: exchange, routingKey: routingKey}, nil
}
func (p *Publisher) Publish(ctx context.Context, task AudioTask) error {
if task.CreatedAt == 0 {
task.CreatedAt = time.Now().Unix()
}
body, err := json.Marshal(task)
if err != nil {
return err
}
confirms := p.ch.NotifyPublish(make(chan amqp.Confirmation, 1))
if err := p.ch.PublishWithContext(ctx, p.exchange, p.routingKey, false, false, amqp.Publishing{
ContentType: "application/json",
Body: body,
DeliveryMode: amqp.Persistent,
}); err != nil {
return err
}
select {
case confirm := <-confirms:
if !confirm.Ack {
return fmt.Errorf("publish not confirmed")
}
return nil
case <-ctx.Done():
return ctx.Err()
}
}