59 lines
1.3 KiB
Go
59 lines
1.3 KiB
Go
package publisher
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
type AudioTask struct {
|
|
TaskID string `json:"task_id"`
|
|
FilePath string `json:"file_path"`
|
|
Filename string `json:"filename"`
|
|
Size int64 `json:"size"`
|
|
CreatedAt int64 `json:"created_at"`
|
|
}
|
|
|
|
type Publisher struct {
|
|
ch *amqp.Channel
|
|
exchange string
|
|
routingKey string
|
|
}
|
|
|
|
func New(ch *amqp.Channel, exchange, routingKey string) (*Publisher, error) {
|
|
if err := ch.Confirm(false); err != nil {
|
|
return nil, fmt.Errorf("confirm mode: %w", err)
|
|
}
|
|
return &Publisher{ch: ch, exchange: exchange, routingKey: routingKey}, nil
|
|
}
|
|
|
|
func (p *Publisher) Publish(ctx context.Context, task AudioTask) error {
|
|
if task.CreatedAt == 0 {
|
|
task.CreatedAt = time.Now().Unix()
|
|
}
|
|
body, err := json.Marshal(task)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
confirms := p.ch.NotifyPublish(make(chan amqp.Confirmation, 1))
|
|
if err := p.ch.PublishWithContext(ctx, p.exchange, p.routingKey, false, false, amqp.Publishing{
|
|
ContentType: "application/json",
|
|
Body: body,
|
|
DeliveryMode: amqp.Persistent,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case confirm := <-confirms:
|
|
if !confirm.Ack {
|
|
return fmt.Errorf("publish not confirmed")
|
|
}
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|