Files
bridge/internal/pubsub/subscriber.go
2023-07-01 00:04:52 +00:00

221 lines
5.4 KiB
Go

package pubsub
import (
"context"
"sync"
nc "github.com/nats-io/nats.go"
"go.infratographer.com/x/events"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.equinixmetal.net/infra9-metal-bridge/internal/service"
"github.com/ThreeDotsLabs/watermill/message"
)
var tracer = otel.Tracer("go.infratographer.com/permissions-api/internal/pubsub")
// Subscriber is the subscriber client
type Subscriber struct {
ctx context.Context
changeChannels []<-chan *message.Message
logger *zap.SugaredLogger
subscriber *events.Subscriber
subOpts []nc.SubOpt
svc service.Service
}
// SubscriberOption is a functional option for the Subscriber
type SubscriberOption func(s *Subscriber)
// WithLogger sets the logger for the Subscriber
func WithLogger(l *zap.SugaredLogger) SubscriberOption {
return func(s *Subscriber) {
s.logger = l
}
}
// WithNatsSubOpts sets the logger for the Subscriber
func WithNatsSubOpts(options ...nc.SubOpt) SubscriberOption {
return func(s *Subscriber) {
s.subOpts = append(s.subOpts, options...)
}
}
// NewSubscriber creates a new Subscriber
func NewSubscriber(ctx context.Context, cfg events.SubscriberConfig, service service.Service, opts ...SubscriberOption) (*Subscriber, error) {
s := &Subscriber{
ctx: ctx,
logger: zap.NewNop().Sugar(),
svc: service,
}
for _, opt := range opts {
opt(s)
}
sub, err := events.NewSubscriber(cfg, s.subOpts...)
if err != nil {
return nil, err
}
s.subscriber = sub
s.logger.Debugw("subscriber configuration", "config", cfg)
return s, nil
}
// Subscribe subscribes to a nats subject
func (s *Subscriber) Subscribe(topic string) error {
msgChan, err := s.subscriber.SubscribeChanges(s.ctx, topic)
if err != nil {
return err
}
s.changeChannels = append(s.changeChannels, msgChan)
s.logger.Infof("Subscribing to topic %s", topic)
return nil
}
// Listen start listening for messages on registered subjects and calls the registered message handler
func (s Subscriber) Listen() error {
wg := &sync.WaitGroup{}
// goroutine for each change channel
for _, ch := range s.changeChannels {
wg.Add(1)
go s.listen(ch, wg)
}
wg.Wait()
return nil
}
// listen listens for messages on a channel and calls the registered message handler
func (s Subscriber) listen(messages <-chan *message.Message, wg *sync.WaitGroup) {
defer wg.Done()
for msg := range messages {
s.logger.Infow("processing event", "event.id", msg.UUID)
if err := s.processEvent(msg); err != nil {
s.logger.Warn("Failed to process msg: ", err)
s.logger.Infow("message nacked", "event.id", msg.UUID)
msg.Nack()
} else {
s.logger.Infow("message acked", "event.id", msg.UUID)
msg.Ack()
}
}
}
// Close closes the subscriber connection and unsubscribes from all subscriptions
func (s *Subscriber) Close() error {
return s.subscriber.Close()
}
// processEvent event message handler
func (s *Subscriber) processEvent(msg *message.Message) error {
changeMsg, err := events.UnmarshalChangeMessage(msg.Payload)
if err != nil {
s.logger.Errorw("failed to process data in msg", zap.Error(err))
return err
}
ctx, span := tracer.Start(context.Background(), "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", changeMsg.SubjectID.String())))
defer span.End()
switch events.ChangeType(changeMsg.EventType) {
case events.CreateChangeType:
err = s.handleTouchEvent(ctx, msg, changeMsg)
case events.UpdateChangeType:
err = s.handleTouchEvent(ctx, msg, changeMsg)
case events.DeleteChangeType:
err = s.handleDeleteEvent(ctx, msg, changeMsg)
default:
s.logger.Warnw("ignoring msg, not a create, update or delete event", "event_type", changeMsg.EventType)
}
if err != nil {
return err
}
return nil
}
func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error {
if s.svc.IsOrganizationID(changeMsg.SubjectID) {
if err := s.svc.TouchOrganization(ctx, changeMsg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
return nil
}
if s.svc.IsProjectID(changeMsg.SubjectID) {
if err := s.svc.TouchProject(ctx, changeMsg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
return nil
}
if s.svc.IsUser(changeMsg.SubjectID) {
if err := s.svc.TouchUser(ctx, changeMsg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
return nil
}
s.logger.Warnw("unknown subject id", "subject.id", changeMsg.SubjectID)
return nil
}
func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error {
if s.svc.IsOrganizationID(changeMsg.SubjectID) {
if err := s.svc.DeleteOrganization(ctx, changeMsg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
return nil
}
if s.svc.IsProjectID(changeMsg.SubjectID) {
if err := s.svc.DeleteProject(ctx, changeMsg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
return nil
}
if s.svc.IsUser(changeMsg.SubjectID) {
if err := s.svc.DeleteUser(ctx, changeMsg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
return nil
}
s.logger.Warnw("unknown subject id", "subject.id", changeMsg.SubjectID)
return nil
}