Adding approval process

This commit is contained in:
2023-07-03 17:26:43 -04:00
parent a43264189f
commit b1b010deee
15 changed files with 640 additions and 71 deletions

View File

@@ -2,4 +2,47 @@
#+AUTHOR: Adam Mohammed
This provides a way to do k8s native application deployment in a way that's simple and requires almost no configuration
Service Demon is a centralized configuration provider for Nautilus services.
This provides a way to do k8s native application deployment in a way that's simple
and requires almost no configuration on the client.
Service Demon runs in k8s and expects a service agent to be deployed alongside
your application.
** Workflow
This demon (playing off of "daemon") hosts an application registration process that,
the our agent is aware of. By simply deploying the agent in your namespace, it will kick off
the application registration process. On completion, the agent is able to respond to commands
from the configuration service to update k8s resources that your application can rely on.
The agent on deploy, will use TLS certificates generated for your applications ingress to
announce itself as an application that wishes to be registered.
Once the app announces that it would like to be registered, an authorized human must approve
the application.
Once the approval goes through, the application is registered, and can start to request application
configuration manifests.
The agent will fetch the manifests it needs and store them by talking to the k8s api. It will create
configuration maps, secrets, and other resources as necessary.
From there a client library loaded into your application will know how to read those manifests
and provide some baseline functionality to your service.
** Motivation
Although microservices are autonomous, they rely on common infrastructure to
reduce the operational overhead on the team maintaining them. Right now,
Nautilus has trouble performing authentication and authorization checks,
particularly between services.
By using a central configuration store, we can deploy and manage authorization policies
centrally, and push them down to the active services, so we can control authorization at runtime.

View File

@@ -1,74 +1,78 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"crypto/tls"
"crypto/x509"
"io"
"log"
"net/http"
"os"
"golang.org/x/oauth2"
"go.fixergrid.net/servicedemon/pkg/discord"
"go.fixergrid.net/servicedemon/pkg/pubsub"
"go.fixergrid.net/servicedemon/pkg/registrar"
)
type noopHandler struct {
http.HandlerFunc
}
func wrapHandlefunc(h http.HandlerFunc) noopHandler {
return noopHandler{
HandlerFunc: h,
}
}
func (h noopHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.HandlerFunc(w, req)
}
func main() {
fmt.Println("Starting .... the >HUB<")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
token, isSet := os.LookupEnv("DISCORD_BOT_TOKEN")
if !isSet {
fmt.Println("please set the environment variable 'DISCORD_BOT_TOKEN'")
return
}
logger := log.New(os.Stdout, "main: ", log.LstdFlags|log.Lshortfile)
logger.Println("Starting .... the >HUB<")
tokensrc := oauth2.StaticTokenSource(&oauth2.Token{
AccessToken: token,
TokenType: "Bot",
})
pubsub := pubsub.New()
repo := registrar.NewRepo()
ctx := context.Background()
r := registrar.NewRegistrar(
pubsub,
repo,
registrar.WithLogger(log.New(os.Stdout, "registrar: ", log.LstdFlags|log.Lshortfile)),
)
al := registrar.NewApprovalListener(
pubsub,
nil,
registrar.OptionLog(log.New(os.Stdout, "approvalListener: ", log.LstdFlags|log.Lshortfile)),
)
go al.Run(ctx)
client := discord.
NewClient(ctx, tokensrc).
WithDefaultChannel("1125162127133523978")
mux := http.NewServeMux()
resp, err := client.Get("https://discord.com/api/v10/channels/1125162127133523978")
mux.HandleFunc("/register", r.HandleRegistration)
mux.Handle("/approvals/", http.StripPrefix("/approvals/", wrapHandlefunc(r.HandleApproval)))
certFile, err := os.Open("./certs/ca.pem")
if err != nil {
fmt.Printf("there was an error making the request: %v\n", err)
return
logger.Fatalf("failed to open ca.pem: %v", err)
}
out, err := ioutil.ReadAll(resp.Body)
caCert, err := io.ReadAll(certFile)
if err != nil {
fmt.Printf("failed to read response body: %v\n", err)
return
logger.Fatalf("failed to read in ca: %v", err)
}
fmt.Printf("Got response:\n%s\n", out)
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caCert)
messageContent := map[string]interface{}{
"content": "Hello, world!",
server := &http.Server{
Addr: ":3001",
TLSConfig: &tls.Config{
ClientCAs: pool,
ClientAuth: tls.RequireAndVerifyClientCert,
},
}
b, err := json.Marshal(messageContent)
if err != nil {
fmt.Printf("failed to marshal message: %v\n", err)
return
}
r2, err := client.Post("https://discord.com/api/v10/channels/1125162127133523978/messages", "application/json", bytes.NewReader(b))
if err != nil {
fmt.Printf("failed tdo send message to server: %v\n", err)
return
}
out, err = ioutil.ReadAll(r2.Body)
if err != nil {
fmt.Printf("failed to read response body on send message: %v\n", err)
return
}
fmt.Printf("status: %v - res: %v\n", r2.Status, out)
server.Handler = mux
log.Println(server.ListenAndServeTLS("./certs/combined.pem", "./certs/server-key.pem"))
}

1
go.mod
View File

@@ -11,6 +11,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.11.0 // indirect

2
go.sum
View File

@@ -9,6 +9,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

51
pkg/pubsub/pubsub.go Normal file
View File

@@ -0,0 +1,51 @@
package pubsub
import "sync"
type Pubsub struct {
mu sync.RWMutex
subs map[string][]chan string
closed bool
}
func New() *Pubsub {
ps := Pubsub{}
ps.subs = make(map[string][]chan string)
return &ps
}
func (ps *Pubsub) Publish(topic string, msg string) {
ps.mu.RLock()
defer ps.mu.RUnlock()
if ps.closed {
return
}
for _, ch := range ps.subs[topic] {
ch <- msg
}
}
func (ps *Pubsub) Subscribe(topic string) <-chan string {
ps.mu.Lock()
defer ps.mu.Unlock()
ch := make(chan string, 1)
ps.subs[topic] = append(ps.subs[topic], ch)
return ch
}
func (ps *Pubsub) Close() {
ps.mu.Lock()
defer ps.mu.Unlock()
if !ps.closed {
ps.closed = true
for _, subs := range ps.subs {
for _, ch := range subs {
close(ch)
}
}
}
}

87
pkg/registrar/approval.go Normal file
View File

@@ -0,0 +1,87 @@
package registrar
import (
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
svc "go.fixergrid.net/servicedemon/pkg/registrar/internal/services"
)
const APPROVAL_TOPIC = "net.fixergrid.events.app.approved"
type ApprovalListener struct {
svc.ApprovalRequester
svc.PubSub
svc.AppRepo
log internalLog
}
type option func(r *ApprovalListener)
func OptionLog(l logger) option {
return func(r *ApprovalListener) {
r.log = internalLog{l}
}
}
func NewApprovalListener(ps svc.PubSub, requester svc.ApprovalRequester, options ...option) ApprovalListener {
out := ApprovalListener{
PubSub: ps,
ApprovalRequester: requester,
}
for _, opt := range options {
opt(&out)
}
return out
}
func (a ApprovalListener) Run(ctx context.Context) {
newAppEvents := a.Subscribe(REGISTRATION_TOPIC)
approvalEvents := a.Subscribe(APPROVAL_TOPIC)
for {
select {
case <-ctx.Done():
return
case event := <-newAppEvents:
msg, err := readMessage(event)
if err != nil {
a.log.Printf("failed to read message: got '%s': %v", event, err)
}
a.SendApprovalRequest(msg["name"])
case event := <-approvalEvents:
msg, err := readMessage(event)
if err != nil {
a.log.Printf("approvalevent: failed to read message: got '%s': %v", event, err)
}
id := uuid.MustParse(msg["id"])
a.HandleApprovedRequest(id)
}
}
}
func (a ApprovalListener) SendApprovalRequest(name string) {
a.log.Printf("sent approval message for [%s]", name)
}
func (a ApprovalListener) HandleApprovedRequest(id uuid.UUID) {
res, err := a.ApproveApp(id)
if err != nil {
a.log.Printf("ERROR: couldn't approve request: %v", err)
return
}
a.log.Printf("approved application [%s]", res.ID)
}
func readMessage(event string) (map[string]string, error) {
out := map[string]string{}
err := json.Unmarshal([]byte(event), &out)
if err != nil {
return nil, fmt.Errorf("readMessage: %w", err)
}
return out, nil
}

View File

@@ -1,12 +1,109 @@
package registrar
import (
"encoding/json"
"fmt"
"net/http"
"github.com/google/uuid"
svc "go.fixergrid.net/servicedemon/pkg/registrar/internal/services"
)
func HandleRegister(resp http.ResponseWriter, req *http.Request) {
headers := resp.Header()
headers.Set("content-type", "application/json")
const (
REGISTRATION_TOPIC = "net.fixergrid.events.app.admission"
)
var validApprover string = "Nautilus Admins"
type Registrar struct {
svc.Publisher
repo svc.AppRepo
log internalLog
}
func WithLogger(l logger) RegistrarOpt {
return func(r *Registrar) {
r.log = internalLog{l}
}
}
type RegistrarOpt func(*Registrar)
func NewRegistrar(publisher svc.Publisher, repo svc.AppRepo, options ...RegistrarOpt) Registrar {
r := Registrar{
Publisher: publisher,
repo: repo,
}
for _, opt := range options {
opt(&r)
}
return r
}
func (r Registrar) HandleRegistration(resp http.ResponseWriter, req *http.Request) {
cert := req.TLS.PeerCertificates[0]
name := cert.DNSNames[0]
if r.repo.IsRegistered(name) {
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(`{"status": "registered"}`))
} else {
id := r.repo.StartAppRegistration(name)
event := map[string]string{
"id": id.String(),
"name": name,
"type": "ApplicationRegistrationSubmitted",
}
r.sendEvent(REGISTRATION_TOPIC, event)
resp.WriteHeader(http.StatusCreated)
fmt.Fprintf(resp, `{"status": "pending_approval"}`)
}
}
func (r Registrar) HandleApproval(resp http.ResponseWriter, req *http.Request) {
headers := resp.Header()
headers.Set("content-type", "application/json")
in := req.URL.Path
cert := req.TLS.PeerCertificates[0]
OUs := cert.Subject.OrganizationalUnit
var ou string
if len(OUs) == 1 {
ou = OUs[0]
} else {
fmt.Fprintf(resp, `{"errors": ["missing OU"]}`)
resp.WriteHeader(http.StatusBadRequest)
return
}
if ou != validApprover {
fmt.Fprintf(resp, `{"errors": ["approval forbidden"]}`)
resp.WriteHeader(http.StatusForbidden)
return
}
appID := uuid.MustParse(in)
event := map[string]string{
"id": appID.String(),
"approver": ou,
"type": "ApplicationRegistrationApproved",
}
r.sendEvent(APPROVAL_TOPIC, event)
resp.WriteHeader(http.StatusOK)
fmt.Fprintf(resp, `{"status": "registered"}`)
}
func (r Registrar) sendEvent(topic string, event map[string]string) error {
b, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("registrar: sendEvent: failed to marshal: %w", err)
}
r.log.Printf("sending event: %s", event)
r.Publish(topic, string(b))
return nil
}

View File

@@ -1,20 +1,155 @@
package registrar
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.fixergrid.net/servicedemon/pkg/pubsub"
)
func TestHandleRegister(t *testing.T) {
type TestRequest http.Request
func NewTestRequest(t *testing.T, method string, url string, body io.Reader) TestRequest {
req, err := http.NewRequest(method, url, body)
require.NoError(t, err)
return TestRequest(*req)
}
func (tr TestRequest) WithFakeTLSState(dnsName string) TestRequest {
cert := x509.Certificate{
DNSNames: []string{
dnsName,
},
}
tr.TLS = &tls.ConnectionState{
PeerCertificates: []*x509.Certificate{&cert},
}
return tr
}
func (tr TestRequest) Create() *http.Request {
req := http.Request(tr)
return &req
}
type AppState struct {
Name string
State string
}
type TestAppRepo []AppState
func (repo TestAppRepo) IsRegistered(name string) bool {
for _, app := range repo {
if name == app.Name && app.State == "registered" {
return true
}
}
return false
}
func (repo TestAppRepo) PendingApprovalCount() int {
count := 0
for _, app := range repo {
if app.State == "pending_approval" {
count += 1
}
}
return count
}
func (repo *TestAppRepo) StartAppRegistration(name string) uuid.UUID {
*repo = append(*repo, AppState{
Name: name,
State: "pending_approval",
})
return uuid.New()
}
func TestHandleRegisterPendingApproval(t *testing.T) {
resp := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, "http://example.com/v1/register", nil)
req := NewTestRequest(t, http.MethodPost, "http://example.com/v1/register", nil).
WithFakeTLSState("dev-app-1.delivery.engineering").
Create()
repo := &TestAppRepo{}
pubsub := pubsub.New()
subscriber := pubsub.Subscribe(REGISTRATION_TOPIC)
NewRegistrar(pubsub, repo).HandleRegistration(resp, req)
assert.Equal(t, http.StatusCreated, resp.Code)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
HandleRegister(resp, req)
assert.Equal(t, []byte(`{"status": "pending_approval"}`), body)
assert.Equal(t, http.StatusBadRequest, resp.Code)
assert.Equal(t, 1, repo.PendingApprovalCount())
expectedEvent := map[string]string{
"name": "dev-app-1.delivery.engineering",
"type": "ApplicationRegistrationSubmitted",
}
assertMessage(t, expectedEvent, subscriber)
}
func TestHandleRegisterAlreadyRegistered(t *testing.T) {
resp := httptest.NewRecorder()
req := NewTestRequest(t, http.MethodPost, "http://example.com/v1/register", nil).
WithFakeTLSState("dev-app-1.delivery.engineering").
Create()
pubsub := pubsub.New()
repo := &TestAppRepo{
AppState{
Name: "dev-app-1.delivery.engineering",
State: "registered",
},
}
NewRegistrar(pubsub, repo).HandleRegistration(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, []byte(`{"status": "registered"}`), body)
}
func assertMessage(t *testing.T, expected map[string]string, sender <-chan string) {
timer := time.After(2 * time.Second)
for {
select {
case msg := <-sender:
compareMessage(t, expected, msg)
return
case <-timer:
t.Fatal("failed waiting for message")
}
}
}
func compareMessage(t *testing.T, expected map[string]string, observed string) {
observedEvent := map[string]string{}
require.NoError(t, json.Unmarshal([]byte(observed), &observedEvent))
assert.NotEmpty(t, observedEvent["id"])
for k, v := range expected {
assert.Equal(t, v, observedEvent[k])
}
}

View File

@@ -0,0 +1,59 @@
package persistence
import (
"github.com/google/uuid"
svc "go.fixergrid.net/servicedemon/pkg/registrar/internal/services"
)
type FakeRepo struct {
apps []*svc.Application
}
func NewFakeRepo() *FakeRepo {
return &FakeRepo{
apps: make([]*svc.Application, 0),
}
}
func (repo FakeRepo) IsRegistered(name string) bool {
for _, app := range repo.apps {
if name == app.Name && app.State == "registered" {
return true
}
}
return false
}
func (repo FakeRepo) PendingApprovalCount() int {
count := 0
for _, app := range repo.apps {
if app.State == "pending_approval" {
count += 1
}
}
return count
}
func (repo *FakeRepo) StartAppRegistration(name string) uuid.UUID {
repo.apps = append(repo.apps, &svc.Application{
Name: name,
State: "pending_approval",
})
return uuid.New()
}
func (repo *FakeRepo) ApproveApp(id uuid.UUID) (svc.Application, error) {
var app *svc.Application
for _, application := range repo.apps {
if application.ID == id {
application.State = "registered"
}
}
if app == nil {
return svc.Application{}, svc.ErrApplicationNotFound
}
return *app, nil
}

View File

@@ -0,0 +1,2 @@
package services

View File

@@ -0,0 +1,5 @@
package services
type ApprovalRequester interface {
SendApprovalRequest(appName string) error
}

View File

@@ -0,0 +1,14 @@
package services
type Publisher interface {
Publish(topic string, message string)
}
type PubSub interface {
Publisher
Subscriber
}
type Subscriber interface {
Subscribe(topic string) <-chan string
}

View File

@@ -0,0 +1,26 @@
package services
import (
"errors"
"fmt"
"github.com/google/uuid"
)
var (
ErrAppRepo = errors.New("AppRepo")
ErrApplicationNotFound = fmt.Errorf("%w: application not found", ErrAppRepo)
)
type Application struct {
ID uuid.UUID
Name string
State string
}
type AppRepo interface {
IsRegistered(name string) bool
PendingApprovalCount() int
StartAppRegistration(name string) uuid.UUID
ApproveApp(id uuid.UUID) (Application, error)
}

33
pkg/registrar/logging.go Normal file
View File

@@ -0,0 +1,33 @@
package registrar
import "fmt"
type logger interface {
Output(int, string) error
}
type ilogger interface {
logger
Print(...interface{})
Printf(string, ...interface{})
Println(...interface{})
}
type internalLog struct {
logger
}
// Println replicates the behaviour of the standard logger.
func (t internalLog) Println(v ...interface{}) {
t.Output(2, fmt.Sprintln(v...))
}
// Printf replicates the behaviour of the standard logger.
func (t internalLog) Printf(format string, v ...interface{}) {
t.Output(2, fmt.Sprintf(format, v...))
}
// Print replicates the behaviour of the standard logger.
func (t internalLog) Print(v ...interface{}) {
t.Output(2, fmt.Sprint(v...))
}

10
pkg/registrar/repo.go Normal file
View File

@@ -0,0 +1,10 @@
package registrar
import (
persist "go.fixergrid.net/servicedemon/pkg/registrar/internal/persistence"
svc "go.fixergrid.net/servicedemon/pkg/registrar/internal/services"
)
func NewRepo() svc.AppRepo {
return persist.NewFakeRepo()
}