266 lines
6.3 KiB
Go
266 lines
6.3 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
nc "github.com/nats-io/nats.go"
|
|
"go.infratographer.com/x/events"
|
|
"go.infratographer.com/x/gidx"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
|
|
"go.equinixmetal.net/infra9-metal-bridge/internal/metal/models"
|
|
"go.equinixmetal.net/infra9-metal-bridge/internal/service"
|
|
)
|
|
|
|
const defaultNakDelay = 10 * time.Second
|
|
|
|
var tracer = otel.Tracer("go.equinixmetal.net/infra9-metal-bridge")
|
|
|
|
// Subscriber is the subscriber client
|
|
type Subscriber struct {
|
|
ctx context.Context
|
|
changeChannels []<-chan *changeEvent
|
|
logger *zap.SugaredLogger
|
|
subscriber *subscriber
|
|
subOpts []nc.SubOpt
|
|
svc service.Service
|
|
}
|
|
|
|
// SubscriberOption is a functional option for the Subscriber
|
|
type SubscriberOption func(s *Subscriber)
|
|
|
|
// WithLogger sets the logger for the Subscriber
|
|
func WithLogger(l *zap.SugaredLogger) SubscriberOption {
|
|
return func(s *Subscriber) {
|
|
s.logger = l
|
|
}
|
|
}
|
|
|
|
// WithNatsSubOpts sets the logger for the Subscriber
|
|
func WithNatsSubOpts(options ...nc.SubOpt) SubscriberOption {
|
|
return func(s *Subscriber) {
|
|
s.subOpts = append(s.subOpts, options...)
|
|
}
|
|
}
|
|
|
|
// NewSubscriber creates a new Subscriber
|
|
func NewSubscriber(ctx context.Context, cfg events.SubscriberConfig, service service.Service, opts ...SubscriberOption) (*Subscriber, error) {
|
|
s := &Subscriber{
|
|
ctx: ctx,
|
|
logger: zap.NewNop().Sugar(),
|
|
svc: service,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(s)
|
|
}
|
|
|
|
sub, err := newSubscriber(ctx, cfg, s.logger, s.subOpts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.subscriber = sub
|
|
|
|
s.logger.Debugw("subscriber configuration", "config", cfg)
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Subscribe subscribes to a nats subject
|
|
func (s *Subscriber) Subscribe(topic string) error {
|
|
msgChan, err := s.subscriber.SubscribeChanges(s.ctx, topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.changeChannels = append(s.changeChannels, msgChan)
|
|
|
|
s.logger.Infof("Subscribing to topic %s", topic)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Listen start listening for messages on registered subjects and calls the registered message handler
|
|
func (s Subscriber) Listen() error {
|
|
wg := &sync.WaitGroup{}
|
|
|
|
// goroutine for each change channel
|
|
for _, ch := range s.changeChannels {
|
|
wg.Add(1)
|
|
|
|
go s.listen(ch, wg)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// listen listens for messages on a channel and calls the registered message handler
|
|
func (s Subscriber) listen(messages <-chan *changeEvent, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
|
|
for msg := range messages {
|
|
mlogger := s.logger.With(
|
|
"nats.subject", msg.Subject,
|
|
"event.subject.id", msg.SubjectID,
|
|
"event.type", msg.EventType,
|
|
)
|
|
|
|
mlogger.Infow("processing event")
|
|
|
|
if err := s.processEvent(msg); err != nil {
|
|
mlogger.Errorw("Failed to process msg: ", "error", err)
|
|
|
|
if err = msg.NakWithDelay(defaultNakDelay); err != nil {
|
|
mlogger.Errorw("error naking failed message", "error", err)
|
|
}
|
|
} else {
|
|
if err = msg.Ack(); err != nil {
|
|
mlogger.Warnw("error acking message", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes the subscriber connection and unsubscribes from all subscriptions
|
|
func (s *Subscriber) Close() {
|
|
s.subscriber.Close()
|
|
}
|
|
|
|
// processEvent event message handler
|
|
func (s *Subscriber) processEvent(msg *changeEvent) error {
|
|
mlogger := s.logger.With(
|
|
"nats.subject", msg.Subject,
|
|
"event.subject.id", msg.SubjectID,
|
|
"event.type", msg.EventType,
|
|
)
|
|
|
|
ctx := events.TraceContextFromChangeMessage(context.Background(), msg.ChangeMessage)
|
|
|
|
ctx, span := tracer.Start(ctx, "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", msg.SubjectID.String())))
|
|
|
|
defer span.End()
|
|
|
|
var err error
|
|
|
|
switch events.ChangeType(msg.EventType) {
|
|
case events.CreateChangeType:
|
|
err = s.handleTouchEvent(ctx, msg)
|
|
case events.UpdateChangeType:
|
|
err = s.handleTouchEvent(ctx, msg)
|
|
case events.DeleteChangeType:
|
|
err = s.handleDeleteEvent(ctx, msg)
|
|
default:
|
|
mlogger.Warn("ignoring msg, not a create, update or delete event")
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Subscriber) handleTouchEvent(ctx context.Context, msg *changeEvent) error {
|
|
mlogger := s.logger.With(
|
|
"nats.subject", msg.Subject,
|
|
"event.subject.id", msg.SubjectID,
|
|
"event.type", msg.EventType,
|
|
)
|
|
|
|
if s.svc.IsOrganizationID(msg.SubjectID) {
|
|
if err := s.svc.TouchOrganization(ctx, msg.SubjectID); err != nil {
|
|
// TODO: only return errors on retryable errors
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if s.svc.IsProjectID(msg.SubjectID) {
|
|
if err := s.svc.TouchProject(ctx, msg.SubjectID); err != nil {
|
|
// TODO: only return errors on retryable errors
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if s.svc.IsUser(msg.SubjectID) {
|
|
userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:]
|
|
|
|
subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID)
|
|
if err != nil {
|
|
mlogger.Errorw("failed to convert user id to identity id", "error", err)
|
|
|
|
return nil
|
|
}
|
|
|
|
if err := s.svc.AssignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil {
|
|
// TODO: only return errors on retryable errors
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
mlogger.Warnw("unknown subject id")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *changeEvent) error {
|
|
mlogger := s.logger.With(
|
|
"nats.subject", msg.Subject,
|
|
"event.subject.id", msg.SubjectID,
|
|
"event.type", msg.EventType,
|
|
)
|
|
|
|
if s.svc.IsOrganizationID(msg.SubjectID) {
|
|
if err := s.svc.DeleteOrganization(ctx, msg.SubjectID); err != nil {
|
|
// TODO: only return errors on retryable errors
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if s.svc.IsProjectID(msg.SubjectID) {
|
|
if err := s.svc.DeleteProject(ctx, msg.SubjectID); err != nil {
|
|
// TODO: only return errors on retryable errors
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if s.svc.IsUser(msg.SubjectID) {
|
|
userUUID := msg.SubjectID.String()[gidx.PrefixPartLength+1:]
|
|
|
|
subjID, err := models.GenerateSubjectID(models.IdentityPrefixUser, models.MetalUserIssuer, models.MetalUserIssuerIDPrefix+userUUID)
|
|
if err != nil {
|
|
mlogger.Errorw("failed to convert user id to identity id", "error", err)
|
|
|
|
return nil
|
|
}
|
|
|
|
if err := s.svc.UnassignUser(ctx, subjID, msg.AdditionalSubjectIDs...); err != nil {
|
|
// TODO: only return errors on retryable errors
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
mlogger.Warnw("unknown subject id")
|
|
|
|
return nil
|
|
}
|