From 2506b8b2d2e7d271565dbcb152d1b09a75492833 Mon Sep 17 00:00:00 2001 From: Adnan Maolood Date: Wed, 12 Jan 2022 09:22:41 -0500 Subject: [PATCH] api: Rig up user webhook delivery --- api/go.mod | 1 + api/graph/schema.resolvers.go | 3 + api/server.go | 4 +- api/webhooks/legacy.go | 149 ++++++++++++++++++++++++++++++++++ api/webhooks/middleware.go | 25 ++++++ api/webhooks/webhooks.go | 148 +++++---------------------------- 6 files changed, 201 insertions(+), 129 deletions(-) create mode 100644 api/webhooks/legacy.go create mode 100644 api/webhooks/middleware.go diff --git a/api/go.mod b/api/go.mod index 23d38a2..0b88aa1 100644 --- a/api/go.mod +++ b/api/go.mod @@ -7,6 +7,7 @@ require ( github.com/99designs/gqlgen v0.14.0 github.com/Masterminds/squirrel v1.4.0 github.com/go-git/go-git/v5 v5.0.0 + github.com/google/uuid v1.1.1 github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/lib/pq v1.8.0 github.com/minio/minio-go/v7 v7.0.5 diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go index fc59e5c..b6f8602 100644 --- a/api/graph/schema.resolvers.go +++ b/api/graph/schema.resolvers.go @@ -174,6 +174,7 @@ func (r *mutationResolver) CreateRepository(ctx context.Context, name string, vi } } + webhooks.DeliverRepoEvent(ctx, model.WebhookEventRepoCreated, &repo) webhooks.DeliverLegacyRepoCreate(ctx, &repo) return nil }); err != nil { @@ -327,6 +328,7 @@ func (r *mutationResolver) UpdateRepository(ctx context.Context, id int, input m } } + webhooks.DeliverRepoEvent(ctx, model.WebhookEventRepoUpdate, &repo) webhooks.DeliverLegacyRepoUpdate(ctx, &repo) return nil }); err != nil { @@ -368,6 +370,7 @@ func (r *mutationResolver) DeleteRepository(ctx context.Context, id int) (*model return err } + webhooks.DeliverRepoEvent(ctx, model.WebhookEventRepoDeleted, &repo) webhooks.DeliverLegacyRepoDeleted(ctx, &repo) return nil }); err != nil { diff --git a/api/server.go b/api/server.go index d3b8e41..64c42c3 100644 --- a/api/server.go +++ b/api/server.go @@ -30,15 +30,17 @@ func main() { scopes[i] = s.String() } + webhookQueue := webhooks.NewQueue(schema) legacyWebhooks := webhooks.NewLegacyQueue() server.NewServer("git.sr.ht", appConfig). WithDefaultMiddleware(). WithMiddleware( loaders.Middleware, + webhooks.Middleware(webhookQueue), webhooks.LegacyMiddleware(legacyWebhooks), ). WithSchema(schema, scopes). - WithQueues(legacyWebhooks.Queue). + WithQueues(webhookQueue.Queue, legacyWebhooks.Queue). Run() } diff --git a/api/webhooks/legacy.go b/api/webhooks/legacy.go new file mode 100644 index 0000000..c2f7e55 --- /dev/null +++ b/api/webhooks/legacy.go @@ -0,0 +1,149 @@ +package webhooks + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "time" + + "git.sr.ht/~sircmpwn/core-go/auth" + "git.sr.ht/~sircmpwn/core-go/webhooks" + sq "github.com/Masterminds/squirrel" + + "git.sr.ht/~sircmpwn/git.sr.ht/api/graph/model" +) + +func NewLegacyQueue() *webhooks.LegacyQueue { + return webhooks.NewLegacyQueue() +} + +var legacyUserCtxKey = &contextKey{"legacyUser"} + +type contextKey struct { + name string +} + +func LegacyMiddleware( + queue *webhooks.LegacyQueue, +) func(next http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(r.Context(), legacyUserCtxKey, queue) + r = r.WithContext(ctx) + next.ServeHTTP(w, r) + }) + } +} + +type RepoWebhookPayload struct { + ID int `json:"id"` + Created time.Time `json:"created"` + Updated time.Time `json:"updated"` + Name string `json:"name"` + Description *string `json:"description"` + Visibility string `json:"visibility"` + + Owner struct { + CanonicalName string `json:"canonical_name"` + Name string `json:"name"` + } `json:"owner"` +} + +func DeliverLegacyRepoCreate(ctx context.Context, repo *model.Repository) { + q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue) + if !ok { + panic(errors.New("No legacy user webhooks worker for this context")) + } + + payload := RepoWebhookPayload{ + ID: repo.ID, + Created: repo.Created, + Updated: repo.Created, + Name: repo.Name, + Description: repo.Description, + Visibility: repo.RawVisibility, + } + + // TODO: User groups + user := auth.ForContext(ctx) + if user.UserID != repo.OwnerID { + // At the time of writing, the only consumers of this function are in a + // context where the authenticated user is the owner of this repo. We + // can skip the database round-trip if we just grab their auth context. + panic(errors.New("TODO: look up user details for this repo")) + } + payload.Owner.CanonicalName = "~" + user.Username + payload.Owner.Name = user.Username + + encoded, err := json.Marshal(&payload) + if err != nil { + panic(err) // Programmer error + } + + query := sq. + Select(). + From("user_webhook_subscription sub"). + Where("sub.user_id = ?", repo.OwnerID) + q.Schedule(ctx, query, "user", "repo:create", encoded) +} + +func DeliverLegacyRepoUpdate(ctx context.Context, repo *model.Repository) { + q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue) + if !ok { + panic(errors.New("No legacy user webhooks worker for this context")) + } + + payload := RepoWebhookPayload{ + ID: repo.ID, + Created: repo.Created, + Updated: repo.Created, + Name: repo.Name, + Description: repo.Description, + Visibility: repo.RawVisibility, + } + + // TODO: User groups + user := auth.ForContext(ctx) + if user.UserID != repo.OwnerID { + // At the time of writing, the only consumers of this function are in a + // context where the authenticated user is the owner of this repo. We + // can skip the database round-trip if we just grab their auth context. + panic(errors.New("TODO: look up user details for this repo")) + } + payload.Owner.CanonicalName = "~" + user.Username + payload.Owner.Name = user.Username + + encoded, err := json.Marshal(&payload) + if err != nil { + panic(err) // Programmer error + } + + query := sq. + Select(). + From("user_webhook_subscription sub"). + Where("sub.user_id = ?", repo.OwnerID) + q.Schedule(ctx, query, "user", "repo:update", encoded) +} + +func DeliverLegacyRepoDeleted(ctx context.Context, repo *model.Repository) { + q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue) + if !ok { + panic(errors.New("No legacy user webhooks worker for this context")) + } + + payload := struct { + ID int `json:"id"` + }{repo.ID} + + encoded, err := json.Marshal(&payload) + if err != nil { + panic(err) // Programmer error + } + + query := sq. + Select(). + From("user_webhook_subscription sub"). + Where("sub.user_id = ?", repo.OwnerID) + q.Schedule(ctx, query, "user", "repo:delete", encoded) +} diff --git a/api/webhooks/middleware.go b/api/webhooks/middleware.go new file mode 100644 index 0000000..4a6746c --- /dev/null +++ b/api/webhooks/middleware.go @@ -0,0 +1,25 @@ +package webhooks + +import ( + "context" + "net/http" + + "git.sr.ht/~sircmpwn/core-go/webhooks" + "github.com/99designs/gqlgen/graphql" +) + +func NewQueue(schema graphql.ExecutableSchema) *webhooks.WebhookQueue { + return webhooks.NewQueue(schema) +} + +var webhooksCtxKey = &contextKey{"userWebhooks"} + +func Middleware(queue *webhooks.WebhookQueue) func(next http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := context.WithValue(r.Context(), webhooksCtxKey, queue) + r = r.WithContext(ctx) + next.ServeHTTP(w, r) + }) + } +} diff --git a/api/webhooks/webhooks.go b/api/webhooks/webhooks.go index c2f7e55..bf2255e 100644 --- a/api/webhooks/webhooks.go +++ b/api/webhooks/webhooks.go @@ -2,148 +2,40 @@ package webhooks import ( "context" - "encoding/json" - "errors" - "net/http" + "log" "time" "git.sr.ht/~sircmpwn/core-go/auth" "git.sr.ht/~sircmpwn/core-go/webhooks" sq "github.com/Masterminds/squirrel" + "github.com/google/uuid" "git.sr.ht/~sircmpwn/git.sr.ht/api/graph/model" ) -func NewLegacyQueue() *webhooks.LegacyQueue { - return webhooks.NewLegacyQueue() -} - -var legacyUserCtxKey = &contextKey{"legacyUser"} - -type contextKey struct { - name string -} - -func LegacyMiddleware( - queue *webhooks.LegacyQueue, -) func(next http.Handler) http.Handler { - return func(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - ctx := context.WithValue(r.Context(), legacyUserCtxKey, queue) - r = r.WithContext(ctx) - next.ServeHTTP(w, r) - }) - } -} - -type RepoWebhookPayload struct { - ID int `json:"id"` - Created time.Time `json:"created"` - Updated time.Time `json:"updated"` - Name string `json:"name"` - Description *string `json:"description"` - Visibility string `json:"visibility"` - - Owner struct { - CanonicalName string `json:"canonical_name"` - Name string `json:"name"` - } `json:"owner"` -} - -func DeliverLegacyRepoCreate(ctx context.Context, repo *model.Repository) { - q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue) - if !ok { - panic(errors.New("No legacy user webhooks worker for this context")) - } - - payload := RepoWebhookPayload{ - ID: repo.ID, - Created: repo.Created, - Updated: repo.Created, - Name: repo.Name, - Description: repo.Description, - Visibility: repo.RawVisibility, - } - - // TODO: User groups - user := auth.ForContext(ctx) - if user.UserID != repo.OwnerID { - // At the time of writing, the only consumers of this function are in a - // context where the authenticated user is the owner of this repo. We - // can skip the database round-trip if we just grab their auth context. - panic(errors.New("TODO: look up user details for this repo")) - } - payload.Owner.CanonicalName = "~" + user.Username - payload.Owner.Name = user.Username - - encoded, err := json.Marshal(&payload) - if err != nil { - panic(err) // Programmer error - } - - query := sq. - Select(). - From("user_webhook_subscription sub"). - Where("sub.user_id = ?", repo.OwnerID) - q.Schedule(ctx, query, "user", "repo:create", encoded) -} - -func DeliverLegacyRepoUpdate(ctx context.Context, repo *model.Repository) { - q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue) +func deliverUserWebhook(ctx context.Context, event model.WebhookEvent, + payload model.WebhookPayload, payloadUUID uuid.UUID) { + q, ok := ctx.Value(webhooksCtxKey).(*webhooks.WebhookQueue) if !ok { - panic(errors.New("No legacy user webhooks worker for this context")) + log.Fatalf("No webhooks worker for this context") } - - payload := RepoWebhookPayload{ - ID: repo.ID, - Created: repo.Created, - Updated: repo.Created, - Name: repo.Name, - Description: repo.Description, - Visibility: repo.RawVisibility, - } - - // TODO: User groups - user := auth.ForContext(ctx) - if user.UserID != repo.OwnerID { - // At the time of writing, the only consumers of this function are in a - // context where the authenticated user is the owner of this repo. We - // can skip the database round-trip if we just grab their auth context. - panic(errors.New("TODO: look up user details for this repo")) - } - payload.Owner.CanonicalName = "~" + user.Username - payload.Owner.Name = user.Username - - encoded, err := json.Marshal(&payload) - if err != nil { - panic(err) // Programmer error - } - + userID := auth.ForContext(ctx).UserID query := sq. Select(). - From("user_webhook_subscription sub"). - Where("sub.user_id = ?", repo.OwnerID) - q.Schedule(ctx, query, "user", "repo:update", encoded) + From("gql_user_wh_sub sub"). + Where("sub.user_id = ?", userID) + q.Schedule(ctx, query, "user", event.String(), + payloadUUID, payload) } -func DeliverLegacyRepoDeleted(ctx context.Context, repo *model.Repository) { - q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue) - if !ok { - panic(errors.New("No legacy user webhooks worker for this context")) +func DeliverRepoEvent(ctx context.Context, + event model.WebhookEvent, repository *model.Repository) { + payloadUUID := uuid.New() + payload := model.RepositoryEvent{ + UUID: payloadUUID.String(), + Event: event, + Date: time.Now().UTC(), + Repository: repository, } - - payload := struct { - ID int `json:"id"` - }{repo.ID} - - encoded, err := json.Marshal(&payload) - if err != nil { - panic(err) // Programmer error - } - - query := sq. - Select(). - From("user_webhook_subscription sub"). - Where("sub.user_id = ?", repo.OwnerID) - q.Schedule(ctx, query, "user", "repo:delete", encoded) + deliverUserWebhook(ctx, event, &payload, payloadUUID) } -- 2.38.4