switch subscriptions to not use events package due to watermill wrapping

This commit is contained in:
Mike Mason
2023-07-19 17:14:25 +00:00
parent 7c22b97a7c
commit 6c5073ba9b
2 changed files with 201 additions and 46 deletions

131
internal/pubsub/nats.go Normal file
View File

@@ -0,0 +1,131 @@
package pubsub
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"strings"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"go.infratographer.com/x/events"
"go.uber.org/zap"
)
const subscriptionBufferSize = 10
type changeEvent struct {
*nats.Msg
events.ChangeMessage
Error error
}
type subscriber struct {
logger *zap.SugaredLogger
nats *nats.Conn
jetstream nats.JetStreamContext
topicPrefix string
queueGroup string
subscriptionOptions []nats.SubOpt
subscriptions []*nats.Subscription
context context.Context
cancelCtx func()
}
func (s *subscriber) durableName(topic string) string {
hash := md5.Sum([]byte(topic))
return s.queueGroup + hex.EncodeToString(hash[:])
}
// SubscribeChanges subscribes to a topic, returning a channel which change events will be sent to.
func (s *subscriber) SubscribeChanges(ctx context.Context, topic string) (<-chan *changeEvent, error) {
subject := strings.Join([]string{s.topicPrefix, "changes", topic}, ".")
opts := s.subscriptionOptions
opts = append(opts, nats.Durable(s.durableName(subject)))
msgCh := make(chan *changeEvent, subscriptionBufferSize)
sub, err := s.jetstream.QueueSubscribe(subject, s.queueGroup, func(msg *nats.Msg) {
event := &changeEvent{
Msg: msg,
}
if err := json.NewDecoder(bytes.NewBuffer(msg.Data)).Decode(&event.ChangeMessage); err != nil {
event.Error = err
}
msgCh <- event
}, opts...)
if err != nil {
return nil, err
}
s.subscriptions = append(s.subscriptions, sub)
go func(subscriber *nats.Subscription) {
select {
case <-ctx.Done():
case <-s.context.Done():
}
if err := sub.Unsubscribe(); err != nil {
s.logger.Errorw("unable to unsubscribe", "error", err, "subject", subject)
}
}(sub)
return msgCh, nil
}
// Close closes the underlying nats connection.
func (s *subscriber) Close() {
s.cancelCtx()
s.nats.Close()
}
func newSubscriber(ctx context.Context, config events.SubscriberConfig, logger *zap.SugaredLogger, subOptions ...nats.SubOpt) (*subscriber, error) {
options := []nats.Option{
nats.Timeout(config.Timeout),
}
switch {
case config.NATSConfig.CredsFile != "":
options = append(options, nats.UserCredentials(config.NATSConfig.CredsFile))
case config.NATSConfig.Token != "":
options = append(options, nats.Token(config.NATSConfig.Token))
}
conn, err := nats.Connect(config.URL, options...)
if err != nil {
return nil, errors.Wrap(err, "cannot connect to NATS")
}
js, err := conn.JetStream()
if err != nil {
conn.Close()
return nil, errors.Wrap(err, "cannot initialize JetStream")
}
ctx, cancel := context.WithCancel(ctx)
return &subscriber{
logger: logger,
nats: conn,
jetstream: js,
topicPrefix: config.Prefix,
subscriptionOptions: subOptions,
context: ctx,
cancelCtx: cancel,
}, nil
}

View File

@@ -3,6 +3,7 @@ package pubsub
import (
"context"
"sync"
"time"
nc "github.com/nats-io/nats.go"
"go.infratographer.com/x/events"
@@ -14,18 +15,18 @@ import (
"go.equinixmetal.net/infra9-metal-bridge/internal/metal/models"
"go.equinixmetal.net/infra9-metal-bridge/internal/service"
"github.com/ThreeDotsLabs/watermill/message"
)
var tracer = otel.Tracer("go.infratographer.com/permissions-api/internal/pubsub")
const defaultNakDelay = 10 * time.Second
var tracer = otel.Tracer("go.equinixmetal.net/infra9-metal-bridge")
// Subscriber is the subscriber client
type Subscriber struct {
ctx context.Context
changeChannels []<-chan *message.Message
changeChannels []<-chan *changeEvent
logger *zap.SugaredLogger
subscriber *events.Subscriber
subscriber *subscriber
subOpts []nc.SubOpt
svc service.Service
}
@@ -59,7 +60,7 @@ func NewSubscriber(ctx context.Context, cfg events.SubscriberConfig, service ser
opt(s)
}
sub, err := events.NewSubscriber(cfg, s.subOpts...)
sub, err := newSubscriber(ctx, cfg, s.logger, s.subOpts...)
if err != nil {
return nil, err
}
@@ -102,49 +103,60 @@ func (s Subscriber) Listen() error {
}
// listen listens for messages on a channel and calls the registered message handler
func (s Subscriber) listen(messages <-chan *message.Message, wg *sync.WaitGroup) {
func (s Subscriber) listen(messages <-chan *changeEvent, wg *sync.WaitGroup) {
defer wg.Done()
for msg := range messages {
s.logger.Infow("processing event", "event.id", msg.UUID)
mlogger := s.logger.With(
"nats.subject", msg.Subject,
"event.subject.id", msg.SubjectID,
"event.type", msg.EventType,
)
mlogger.Infow("processing event")
if err := s.processEvent(msg); err != nil {
s.logger.Warn("Failed to process msg: ", err)
mlogger.Errorw("Failed to process msg: ", "error", err)
msg.Nack()
if err = msg.NakWithDelay(defaultNakDelay); err != nil {
mlogger.Errorw("error naking failed message", "error", err)
}
} else {
msg.Ack()
if err = msg.Ack(); err != nil {
mlogger.Warnw("error acking message", "error", err)
}
}
}
}
// Close closes the subscriber connection and unsubscribes from all subscriptions
func (s *Subscriber) Close() error {
return s.subscriber.Close()
func (s *Subscriber) Close() {
s.subscriber.Close()
}
// processEvent event message handler
func (s *Subscriber) processEvent(msg *message.Message) error {
changeMsg, err := events.UnmarshalChangeMessage(msg.Payload)
if err != nil {
s.logger.Errorw("failed to process data in msg", zap.Error(err))
func (s *Subscriber) processEvent(msg *changeEvent) error {
mlogger := s.logger.With(
"nats.subject", msg.Subject,
"event.subject.id", msg.SubjectID,
"event.type", msg.EventType,
)
return err
}
ctx, span := tracer.Start(context.Background(), "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", changeMsg.SubjectID.String())))
ctx, span := tracer.Start(context.Background(), "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", msg.SubjectID.String())))
defer span.End()
switch events.ChangeType(changeMsg.EventType) {
var err error
switch events.ChangeType(msg.EventType) {
case events.CreateChangeType:
err = s.handleTouchEvent(ctx, msg, changeMsg)
err = s.handleTouchEvent(ctx, msg)
case events.UpdateChangeType:
err = s.handleTouchEvent(ctx, msg, changeMsg)
err = s.handleTouchEvent(ctx, msg)
case events.DeleteChangeType:
err = s.handleDeleteEvent(ctx, msg, changeMsg)
err = s.handleDeleteEvent(ctx, msg)
default:
s.logger.Warnw("ignoring msg, not a create, update or delete event", "event_type", changeMsg.EventType)
mlogger.Warn("ignoring msg, not a create, update or delete event")
}
if err != nil {
@@ -154,9 +166,15 @@ func (s *Subscriber) processEvent(msg *message.Message) error {
return nil
}
func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error {
if s.svc.IsOrganizationID(changeMsg.SubjectID) {
if err := s.svc.TouchOrganization(ctx, changeMsg.SubjectID); err != nil {
func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *changeEvent) error {
mlogger := s.logger.With(
"nats.subject", msg.Subject,
"event.subject.id", msg.SubjectID,
"event.type", msg.EventType,
)
if s.svc.IsOrganizationID(msg.SubjectID) {
if err := s.svc.TouchOrganization(ctx, msg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
@@ -164,8 +182,8 @@ func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message,
return nil
}
if s.svc.IsProjectID(changeMsg.SubjectID) {
if err := s.svc.TouchProject(ctx, changeMsg.SubjectID); err != nil {
if s.svc.IsProjectID(msg.SubjectID) {
if err := s.svc.TouchProject(ctx, msg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
@@ -173,17 +191,17 @@ func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message,
return nil
}
if s.svc.IsUser(changeMsg.SubjectID) {
userUUID := changeMsg.SubjectID.String()[gidx.PrefixPartLength+1:]
if s.svc.IsUser(msg.SubjectID) {
userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:]
subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID)
if err != nil {
s.logger.Errorw("failed to convert user id to identity id", "user.id", changeMsg.SubjectID.String(), "error", err)
mlogger.Errorw("failed to convert user id to identity id", "error", err)
return nil
}
if err := s.svc.AssignUser(ctx, subjID, changeMsg.AdditionalSubjectIDs...); err != nil {
if err := s.svc.AssignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil {
// TODO: only return errors on retryable errors
return err
}
@@ -191,14 +209,20 @@ func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *message.Message,
return nil
}
s.logger.Warnw("unknown subject id", "subject.id", changeMsg.SubjectID)
mlogger.Warnw("unknown subject id")
return nil
}
func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error {
if s.svc.IsOrganizationID(changeMsg.SubjectID) {
if err := s.svc.DeleteOrganization(ctx, changeMsg.SubjectID); err != nil {
func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *changeEvent) error {
mlogger := s.logger.With(
"nats.subject", msg.Subject,
"event.subject.id", msg.SubjectID,
"event.type", msg.EventType,
)
if s.svc.IsOrganizationID(msg.SubjectID) {
if err := s.svc.DeleteOrganization(ctx, msg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
@@ -206,8 +230,8 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message
return nil
}
if s.svc.IsProjectID(changeMsg.SubjectID) {
if err := s.svc.DeleteProject(ctx, changeMsg.SubjectID); err != nil {
if s.svc.IsProjectID(msg.SubjectID) {
if err := s.svc.DeleteProject(ctx, msg.SubjectID); err != nil {
// TODO: only return errors on retryable errors
return err
}
@@ -215,17 +239,17 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message
return nil
}
if s.svc.IsUser(changeMsg.SubjectID) {
userUUID := changeMsg.SubjectID.String()[gidx.PrefixPartLength+1:]
if s.svc.IsUser(msg.SubjectID) {
userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:]
subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID)
if err != nil {
s.logger.Errorw("failed to convert user id to identity id", "user.id", changeMsg.SubjectID.String(), "error", err)
mlogger.Errorw("failed to convert user id to identity id", "error", err)
return nil
}
if err := s.svc.UnassignUser(ctx, subjID, changeMsg.AdditionalSubjectIDs...); err != nil {
if err := s.svc.UnassignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil {
// TODO: only return errors on retryable errors
return err
}
@@ -233,7 +257,7 @@ func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message
return nil
}
s.logger.Warnw("unknown subject id", "subject.id", changeMsg.SubjectID)
mlogger.Warnw("unknown subject id")
return nil
}