From 6c5073ba9ba9bbfc0f7e1b5cdadd9ae65c22d1fb Mon Sep 17 00:00:00 2001 From: Mike Mason Date: Wed, 19 Jul 2023 17:14:25 +0000 Subject: [PATCH] switch subscriptions to not use events package due to watermill wrapping --- internal/pubsub/nats.go | 131 ++++++++++++++++++++++++++++++++++ internal/pubsub/subscriber.go | 116 ++++++++++++++++++------------ 2 files changed, 201 insertions(+), 46 deletions(-) create mode 100644 internal/pubsub/nats.go diff --git a/internal/pubsub/nats.go b/internal/pubsub/nats.go new file mode 100644 index 0000000..3c642c7 --- /dev/null +++ b/internal/pubsub/nats.go @@ -0,0 +1,131 @@ +package pubsub + +import ( + "bytes" + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "strings" + + "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "go.infratographer.com/x/events" + "go.uber.org/zap" +) + +const subscriptionBufferSize = 10 + +type changeEvent struct { + *nats.Msg + events.ChangeMessage + + Error error +} + +type subscriber struct { + logger *zap.SugaredLogger + + nats *nats.Conn + jetstream nats.JetStreamContext + + topicPrefix string + queueGroup string + subscriptionOptions []nats.SubOpt + + subscriptions []*nats.Subscription + + context context.Context + cancelCtx func() +} + +func (s *subscriber) durableName(topic string) string { + hash := md5.Sum([]byte(topic)) + + return s.queueGroup + hex.EncodeToString(hash[:]) +} + +// SubscribeChanges subscribes to a topic, returning a channel which change events will be sent to. +func (s *subscriber) SubscribeChanges(ctx context.Context, topic string) (<-chan *changeEvent, error) { + subject := strings.Join([]string{s.topicPrefix, "changes", topic}, ".") + + opts := s.subscriptionOptions + + opts = append(opts, nats.Durable(s.durableName(subject))) + + msgCh := make(chan *changeEvent, subscriptionBufferSize) + + sub, err := s.jetstream.QueueSubscribe(subject, s.queueGroup, func(msg *nats.Msg) { + event := &changeEvent{ + Msg: msg, + } + + if err := json.NewDecoder(bytes.NewBuffer(msg.Data)).Decode(&event.ChangeMessage); err != nil { + event.Error = err + } + + msgCh <- event + }, opts...) + if err != nil { + return nil, err + } + + s.subscriptions = append(s.subscriptions, sub) + + go func(subscriber *nats.Subscription) { + select { + case <-ctx.Done(): + case <-s.context.Done(): + } + + if err := sub.Unsubscribe(); err != nil { + s.logger.Errorw("unable to unsubscribe", "error", err, "subject", subject) + } + }(sub) + + return msgCh, nil +} + +// Close closes the underlying nats connection. +func (s *subscriber) Close() { + s.cancelCtx() + + s.nats.Close() +} + +func newSubscriber(ctx context.Context, config events.SubscriberConfig, logger *zap.SugaredLogger, subOptions ...nats.SubOpt) (*subscriber, error) { + options := []nats.Option{ + nats.Timeout(config.Timeout), + } + + switch { + case config.NATSConfig.CredsFile != "": + options = append(options, nats.UserCredentials(config.NATSConfig.CredsFile)) + case config.NATSConfig.Token != "": + options = append(options, nats.Token(config.NATSConfig.Token)) + } + + conn, err := nats.Connect(config.URL, options...) + if err != nil { + return nil, errors.Wrap(err, "cannot connect to NATS") + } + + js, err := conn.JetStream() + if err != nil { + conn.Close() + + return nil, errors.Wrap(err, "cannot initialize JetStream") + } + + ctx, cancel := context.WithCancel(ctx) + + return &subscriber{ + logger: logger, + nats: conn, + jetstream: js, + topicPrefix: config.Prefix, + subscriptionOptions: subOptions, + context: ctx, + cancelCtx: cancel, + }, nil +} diff --git a/internal/pubsub/subscriber.go b/internal/pubsub/subscriber.go index eefb034..1095516 100644 --- a/internal/pubsub/subscriber.go +++ b/internal/pubsub/subscriber.go @@ -3,6 +3,7 @@ package pubsub import ( "context" "sync" + "time" nc "github.com/nats-io/nats.go" "go.infratographer.com/x/events" @@ -14,18 +15,18 @@ import ( "go.equinixmetal.net/infra9-metal-bridge/internal/metal/models" "go.equinixmetal.net/infra9-metal-bridge/internal/service" - - "github.com/ThreeDotsLabs/watermill/message" ) -var tracer = otel.Tracer("go.infratographer.com/permissions-api/internal/pubsub") +const defaultNakDelay = 10 * time.Second + +var tracer = otel.Tracer("go.equinixmetal.net/infra9-metal-bridge") // Subscriber is the subscriber client type Subscriber struct { ctx context.Context - changeChannels []<-chan *message.Message + changeChannels []<-chan *changeEvent logger *zap.SugaredLogger - subscriber *events.Subscriber + subscriber *subscriber subOpts []nc.SubOpt svc service.Service } @@ -59,7 +60,7 @@ func NewSubscriber(ctx context.Context, cfg events.SubscriberConfig, service ser opt(s) } - sub, err := events.NewSubscriber(cfg, s.subOpts...) + sub, err := newSubscriber(ctx, cfg, s.logger, s.subOpts...) if err != nil { return nil, err } @@ -102,49 +103,60 @@ func (s Subscriber) Listen() error { } // listen listens for messages on a channel and calls the registered message handler -func (s Subscriber) listen(messages <-chan *message.Message, wg *sync.WaitGroup) { +func (s Subscriber) listen(messages <-chan *changeEvent, wg *sync.WaitGroup) { defer wg.Done() for msg := range messages { - s.logger.Infow("processing event", "event.id", msg.UUID) + mlogger := s.logger.With( + "nats.subject", msg.Subject, + "event.subject.id", msg.SubjectID, + "event.type", msg.EventType, + ) + + mlogger.Infow("processing event") if err := s.processEvent(msg); err != nil { - s.logger.Warn("Failed to process msg: ", err) + mlogger.Errorw("Failed to process msg: ", "error", err) - msg.Nack() + if err = msg.NakWithDelay(defaultNakDelay); err != nil { + mlogger.Errorw("error naking failed message", "error", err) + } } else { - msg.Ack() + if err = msg.Ack(); err != nil { + mlogger.Warnw("error acking message", "error", err) + } } } } // Close closes the subscriber connection and unsubscribes from all subscriptions -func (s *Subscriber) Close() error { - return s.subscriber.Close() +func (s *Subscriber) Close() { + s.subscriber.Close() } // processEvent event message handler -func (s *Subscriber) processEvent(msg *message.Message) error { - changeMsg, err := events.UnmarshalChangeMessage(msg.Payload) - if err != nil { - s.logger.Errorw("failed to process data in msg", zap.Error(err)) +func (s *Subscriber) processEvent(msg *changeEvent) error { + mlogger := s.logger.With( + "nats.subject", msg.Subject, + "event.subject.id", msg.SubjectID, + "event.type", msg.EventType, + ) - return err - } - - ctx, span := tracer.Start(context.Background(), "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", changeMsg.SubjectID.String()))) + ctx, span := tracer.Start(context.Background(), "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", msg.SubjectID.String()))) defer span.End() - switch events.ChangeType(changeMsg.EventType) { + var err error + + switch events.ChangeType(msg.EventType) { case events.CreateChangeType: - err = s.handleTouchEvent(ctx, msg, changeMsg) + err = s.handleTouchEvent(ctx, msg) case events.UpdateChangeType: - err = s.handleTouchEvent(ctx, msg, changeMsg) + err = s.handleTouchEvent(ctx, msg) case events.DeleteChangeType: - err = s.handleDeleteEvent(ctx, msg, changeMsg) + err = s.handleDeleteEvent(ctx, msg) default: - s.logger.Warnw("ignoring msg, not a create, update or delete event", "event_type", changeMsg.EventType) + mlogger.Warn("ignoring msg, not a create, update or delete event") } if err != nil { @@ -154,9 +166,15 @@ func (s *Subscriber) processEvent(msg *message.Message) error { return nil } -func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error { - if s.svc.IsOrganizationID(changeMsg.SubjectID) { - if err := s.svc.TouchOrganization(ctx, changeMsg.SubjectID); err != nil { +func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *changeEvent) error { + mlogger := s.logger.With( + "nats.subject", msg.Subject, + "event.subject.id", msg.SubjectID, + "event.type", msg.EventType, + ) + + if s.svc.IsOrganizationID(msg.SubjectID) { + if err := s.svc.TouchOrganization(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } @@ -164,8 +182,8 @@ func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message, return nil } - if s.svc.IsProjectID(changeMsg.SubjectID) { - if err := s.svc.TouchProject(ctx, changeMsg.SubjectID); err != nil { + if s.svc.IsProjectID(msg.SubjectID) { + if err := s.svc.TouchProject(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } @@ -173,17 +191,17 @@ func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message, return nil } - if s.svc.IsUser(changeMsg.SubjectID) { - userUUID := changeMsg.SubjectID.String()[gidx.PrefixPartLength+1:] + if s.svc.IsUser(msg.SubjectID) { + userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:] subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID) if err != nil { - s.logger.Errorw("failed to convert user id to identity id", "user.id", changeMsg.SubjectID.String(), "error", err) + mlogger.Errorw("failed to convert user id to identity id", "error", err) return nil } - if err := s.svc.AssignUser(ctx, subjID, changeMsg.AdditionalSubjectIDs...); err != nil { + if err := s.svc.AssignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil { // TODO: only return errors on retryable errors return err } @@ -191,14 +209,20 @@ func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message, return nil } - s.logger.Warnw("unknown subject id", "subject.id", changeMsg.SubjectID) + mlogger.Warnw("unknown subject id") return nil } -func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error { - if s.svc.IsOrganizationID(changeMsg.SubjectID) { - if err := s.svc.DeleteOrganization(ctx, changeMsg.SubjectID); err != nil { +func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *changeEvent) error { + mlogger := s.logger.With( + "nats.subject", msg.Subject, + "event.subject.id", msg.SubjectID, + "event.type", msg.EventType, + ) + + if s.svc.IsOrganizationID(msg.SubjectID) { + if err := s.svc.DeleteOrganization(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } @@ -206,8 +230,8 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message return nil } - if s.svc.IsProjectID(changeMsg.SubjectID) { - if err := s.svc.DeleteProject(ctx, changeMsg.SubjectID); err != nil { + if s.svc.IsProjectID(msg.SubjectID) { + if err := s.svc.DeleteProject(ctx, msg.SubjectID); err != nil { // TODO: only return errors on retryable errors return err } @@ -215,17 +239,17 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message return nil } - if s.svc.IsUser(changeMsg.SubjectID) { - userUUID := changeMsg.SubjectID.String()[gidx.PrefixPartLength+1:] + if s.svc.IsUser(msg.SubjectID) { + userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:] subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID) if err != nil { - s.logger.Errorw("failed to convert user id to identity id", "user.id", changeMsg.SubjectID.String(), "error", err) + mlogger.Errorw("failed to convert user id to identity id", "error", err) return nil } - if err := s.svc.UnassignUser(ctx, subjID, changeMsg.AdditionalSubjectIDs...); err != nil { + if err := s.svc.UnassignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil { // TODO: only return errors on retryable errors return err } @@ -233,7 +257,7 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message return nil } - s.logger.Warnw("unknown subject id", "subject.id", changeMsg.SubjectID) + mlogger.Warnw("unknown subject id") return nil }