package pubsub import ( "context" "sync" "time" nc "github.com/nats-io/nats.go" "go.infratographer.com/x/events" "go.infratographer.com/x/gidx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "go.equinixmetal.net/infra9-metal-bridge/internal/metal/models" "go.equinixmetal.net/infra9-metal-bridge/internal/service" ) const defaultNakDelay = 10 * time.Second var tracer = otel.Tracer("go.equinixmetal.net/infra9-metal-bridge") // Subscriber is the subscriber client type Subscriber struct { ctx context.Context changeChannels []<-chan *changeEvent logger *zap.SugaredLogger subscriber *subscriber subOpts []nc.SubOpt svc service.Service } // SubscriberOption is a functional option for the Subscriber type SubscriberOption func(s *Subscriber) // WithLogger sets the logger for the Subscriber func WithLogger(l *zap.SugaredLogger) SubscriberOption { return func(s *Subscriber) { s.logger = l } } // WithNatsSubOpts sets the logger for the Subscriber func WithNatsSubOpts(options ...nc.SubOpt) SubscriberOption { return func(s *Subscriber) { s.subOpts = append(s.subOpts, options...) } } // NewSubscriber creates a new Subscriber func NewSubscriber(ctx context.Context, cfg events.SubscriberConfig, service service.Service, opts ...SubscriberOption) (*Subscriber, error) { s := &Subscriber{ ctx: ctx, logger: zap.NewNop().Sugar(), svc: service, } for _, opt := range opts { opt(s) } sub, err := newSubscriber(ctx, cfg, s.logger, s.subOpts...) if err != nil { return nil, err } s.subscriber = sub s.logger.Debugw("subscriber configuration", "config", cfg) return s, nil } // Subscribe subscribes to a nats subject func (s *Subscriber) Subscribe(topic string) error { msgChan, err := s.subscriber.SubscribeChanges(s.ctx, topic) if err != nil { return err } s.changeChannels = append(s.changeChannels, msgChan) s.logger.Infof("Subscribing to topic %s", topic) return nil } // Listen start listening for messages on registered subjects and calls the registered message handler func (s Subscriber) Listen() error { wg := &sync.WaitGroup{} // goroutine for each change channel for _, ch := range s.changeChannels { wg.Add(1) go s.listen(ch, wg) } wg.Wait() return nil } // listen listens for messages on a channel and calls the registered message handler func (s Subscriber) listen(messages <-chan *changeEvent, wg *sync.WaitGroup) { defer wg.Done() for msg := range messages { mlogger := s.logger.With( "nats.subject", msg.Subject, "event.subject.id", msg.SubjectID, "event.type", msg.EventType, ) mlogger.Infow("processing event") if err := s.processEvent(msg); err != nil { mlogger.Errorw("Failed to process msg: ", "error", err) if err = msg.NakWithDelay(defaultNakDelay); err != nil { mlogger.Errorw("error naking failed message", "error", err) } } else { if err = msg.Ack(); err != nil { mlogger.Warnw("error acking message", "error", err) } } } } // Close closes the subscriber connection and unsubscribes from all subscriptions func (s *Subscriber) Close() { s.subscriber.Close() } // processEvent event message handler func (s *Subscriber) processEvent(msg *changeEvent) error { mlogger := s.logger.With( "nats.subject", msg.Subject, "event.subject.id", msg.SubjectID, "event.type", msg.EventType, ) ctx := events.TraceContextFromChangeMessage(context.Background(), msg.ChangeMessage) ctx, span := tracer.Start(ctx, "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", msg.SubjectID.String()))) defer span.End() var err error switch events.ChangeType(msg.EventType) { case events.CreateChangeType: err = s.handleTouchEvent(ctx, msg) case events.UpdateChangeType: err = s.handleTouchEvent(ctx, msg) case events.DeleteChangeType: err = s.handleDeleteEvent(ctx, msg) default: mlogger.Warn("ignoring msg, not a create, update or delete event") } if err != nil { return err } return nil } func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *changeEvent) error { mlogger := s.logger.With( "nats.subject", msg.Subject, "event.subject.id", msg.SubjectID, "event.type", msg.EventType, ) if s.svc.IsOrganizationID(msg.SubjectID) { if err := s.svc.TouchOrganization(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } return nil } if s.svc.IsProjectID(msg.SubjectID) { if err := s.svc.TouchProject(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } return nil } if s.svc.IsUser(msg.SubjectID) { userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:] subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID) if err != nil { mlogger.Errorw("failed to convert user id to identity id", "error", err) return nil } if err := s.svc.AssignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil { // TODO: only return errors on retryable errors return err } return nil } mlogger.Warnw("unknown subject id") return nil } func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *changeEvent) error { mlogger := s.logger.With( "nats.subject", msg.Subject, "event.subject.id", msg.SubjectID, "event.type", msg.EventType, ) if s.svc.IsOrganizationID(msg.SubjectID) { if err := s.svc.DeleteOrganization(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } return nil } if s.svc.IsProjectID(msg.SubjectID) { if err := s.svc.DeleteProject(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } return nil } if s.svc.IsUser(msg.SubjectID) { userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:] subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID) if err != nil { mlogger.Errorw("failed to convert user id to identity id", "error", err) return nil } if err := s.svc.UnassignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil { // TODO: only return errors on retryable errors return err } return nil } mlogger.Warnw("unknown subject id") return nil }