package pubsub import ( "bytes" "context" "crypto/md5" "encoding/hex" "encoding/json" "strings" "github.com/nats-io/nats.go" "github.com/pkg/errors" "go.infratographer.com/x/events" "go.uber.org/zap" ) const subscriptionBufferSize = 10 type changeEvent struct { *nats.Msg events.ChangeMessage Error error } type subscriber struct { logger *zap.SugaredLogger nats *nats.Conn jetstream nats.JetStreamContext topicPrefix string queueGroup string subscriptionOptions []nats.SubOpt subscriptions []*nats.Subscription context context.Context cancelCtx func() } func (s *subscriber) durableName(topic string) string { hash := md5.Sum([]byte(topic)) return s.queueGroup + hex.EncodeToString(hash[:]) } // SubscribeChanges subscribes to a topic, returning a channel which change events will be sent to. func (s *subscriber) SubscribeChanges(ctx context.Context, topic string) (<-chan *changeEvent, error) { subject := strings.Join([]string{s.topicPrefix, "changes", topic}, ".") opts := s.subscriptionOptions opts = append(opts, nats.Durable(s.durableName(subject)), nats.AckExplicit(), nats.ManualAck()) msgCh := make(chan *changeEvent, subscriptionBufferSize) sub, err := s.jetstream.QueueSubscribe(subject, s.queueGroup, func(msg *nats.Msg) { event := &changeEvent{ Msg: msg, } if err := json.NewDecoder(bytes.NewBuffer(msg.Data)).Decode(&event.ChangeMessage); err != nil { event.Error = err } msgCh <- event }, opts...) if err != nil { return nil, err } s.subscriptions = append(s.subscriptions, sub) go func(subscriber *nats.Subscription) { select { case <-ctx.Done(): case <-s.context.Done(): } if err := sub.Unsubscribe(); err != nil { s.logger.Errorw("unable to unsubscribe", "error", err, "subject", subject) } }(sub) return msgCh, nil } // Close closes the underlying nats connection. func (s *subscriber) Close() { s.cancelCtx() s.nats.Close() } func newSubscriber(ctx context.Context, config events.SubscriberConfig, logger *zap.SugaredLogger, subOptions ...nats.SubOpt) (*subscriber, error) { options := []nats.Option{ nats.Timeout(config.Timeout), } switch { case config.NATSConfig.CredsFile != "": options = append(options, nats.UserCredentials(config.NATSConfig.CredsFile)) case config.NATSConfig.Token != "": options = append(options, nats.Token(config.NATSConfig.Token)) } conn, err := nats.Connect(config.URL, options...) if err != nil { return nil, errors.Wrap(err, "cannot connect to NATS") } js, err := conn.JetStream() if err != nil { conn.Close() return nil, errors.Wrap(err, "cannot initialize JetStream") } ctx, cancel := context.WithCancel(ctx) return &subscriber{ logger: logger, nats: conn, jetstream: js, queueGroup: config.QueueGroup, topicPrefix: config.Prefix, subscriptionOptions: subOptions, context: ctx, cancelCtx: cancel, }, nil }