Spaces:
Runtime error
Runtime error
| package notification | |
| import ( | |
| "context" | |
| "github.com/apache/pulsar-client-go/pulsar" | |
| "github.com/chroma/chroma-coordinator/internal/model" | |
| "github.com/chroma/chroma-coordinator/internal/proto/coordinatorpb" | |
| "github.com/pingcap/log" | |
| "go.uber.org/zap" | |
| "google.golang.org/protobuf/proto" | |
| ) | |
| type Notifier interface { | |
| Notify(ctx context.Context, notifications []model.Notification) error | |
| } | |
| type PulsarNotifier struct { | |
| producer pulsar.Producer | |
| } | |
| var _ Notifier = &PulsarNotifier{} | |
| func NewPulsarNotifier(producer pulsar.Producer) *PulsarNotifier { | |
| return &PulsarNotifier{ | |
| producer: producer, | |
| } | |
| } | |
| func (p *PulsarNotifier) Notify(ctx context.Context, notifications []model.Notification) error { | |
| for _, notification := range notifications { | |
| notificationPb := coordinatorpb.Notification{ | |
| CollectionId: notification.CollectionID, | |
| Type: notification.Type, | |
| Status: notification.Status, | |
| } | |
| payload, err := proto.Marshal(¬ificationPb) | |
| if err != nil { | |
| log.Error("Failed to marshal notification", zap.Error(err)) | |
| return err | |
| } | |
| message := &pulsar.ProducerMessage{ | |
| Key: notification.CollectionID, | |
| Payload: payload, | |
| } | |
| // Since the number of notifications is small, we can send them synchronously | |
| // for now. This is easy to reason about hte order of notifications. | |
| // | |
| // As follow up optimizations, we can send them asynchronously in batches and | |
| // track failed messages. | |
| _, err = p.producer.Send(ctx, message) | |
| if err != nil { | |
| log.Error("Failed to send message", zap.Error(err)) | |
| return err | |
| } | |
| log.Info("Published message", zap.Any("message", message)) | |
| } | |
| return nil | |
| } | |
| type MemoryNotifier struct { | |
| queue []pulsar.ProducerMessage | |
| } | |
| var _ Notifier = &MemoryNotifier{} | |
| func NewMemoryNotifier() *MemoryNotifier { | |
| return &MemoryNotifier{ | |
| queue: make([]pulsar.ProducerMessage, 0), | |
| } | |
| } | |
| func (m *MemoryNotifier) Notify(ctx context.Context, notifications []model.Notification) error { | |
| for _, notification := range notifications { | |
| notificationPb := coordinatorpb.Notification{ | |
| CollectionId: notification.CollectionID, | |
| Type: notification.Type, | |
| Status: notification.Status, | |
| } | |
| payload, err := proto.Marshal(¬ificationPb) | |
| if err != nil { | |
| log.Error("Failed to marshal notification", zap.Error(err)) | |
| return err | |
| } | |
| message := pulsar.ProducerMessage{ | |
| Key: notification.CollectionID, | |
| Payload: payload, | |
| } | |
| m.queue = append(m.queue, message) | |
| log.Info("Published message", zap.Any("message", message)) | |
| } | |
| return nil | |
| } | |