M api/go.mod => api/go.mod +1 -0
@@ 7,6 7,7 @@ require (
github.com/99designs/gqlgen v0.14.0
github.com/Masterminds/squirrel v1.4.0
github.com/go-git/go-git/v5 v5.0.0
+ github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/lib/pq v1.8.0
github.com/minio/minio-go/v7 v7.0.5
M api/graph/schema.resolvers.go => api/graph/schema.resolvers.go +3 -0
@@ 174,6 174,7 @@ func (r *mutationResolver) CreateRepository(ctx context.Context, name string, vi
}
}
+ webhooks.DeliverRepoEvent(ctx, model.WebhookEventRepoCreated, &repo)
webhooks.DeliverLegacyRepoCreate(ctx, &repo)
return nil
}); err != nil {
@@ 327,6 328,7 @@ func (r *mutationResolver) UpdateRepository(ctx context.Context, id int, input m
}
}
+ webhooks.DeliverRepoEvent(ctx, model.WebhookEventRepoUpdate, &repo)
webhooks.DeliverLegacyRepoUpdate(ctx, &repo)
return nil
}); err != nil {
@@ 368,6 370,7 @@ func (r *mutationResolver) DeleteRepository(ctx context.Context, id int) (*model
return err
}
+ webhooks.DeliverRepoEvent(ctx, model.WebhookEventRepoDeleted, &repo)
webhooks.DeliverLegacyRepoDeleted(ctx, &repo)
return nil
}); err != nil {
M api/server.go => api/server.go +3 -1
@@ 30,15 30,17 @@ func main() {
scopes[i] = s.String()
}
+ webhookQueue := webhooks.NewQueue(schema)
legacyWebhooks := webhooks.NewLegacyQueue()
server.NewServer("git.sr.ht", appConfig).
WithDefaultMiddleware().
WithMiddleware(
loaders.Middleware,
+ webhooks.Middleware(webhookQueue),
webhooks.LegacyMiddleware(legacyWebhooks),
).
WithSchema(schema, scopes).
- WithQueues(legacyWebhooks.Queue).
+ WithQueues(webhookQueue.Queue, legacyWebhooks.Queue).
Run()
}
A api/webhooks/legacy.go => api/webhooks/legacy.go +149 -0
@@ 0,0 1,149 @@
+package webhooks
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "net/http"
+ "time"
+
+ "git.sr.ht/~sircmpwn/core-go/auth"
+ "git.sr.ht/~sircmpwn/core-go/webhooks"
+ sq "github.com/Masterminds/squirrel"
+
+ "git.sr.ht/~sircmpwn/git.sr.ht/api/graph/model"
+)
+
+func NewLegacyQueue() *webhooks.LegacyQueue {
+ return webhooks.NewLegacyQueue()
+}
+
+var legacyUserCtxKey = &contextKey{"legacyUser"}
+
+type contextKey struct {
+ name string
+}
+
+func LegacyMiddleware(
+ queue *webhooks.LegacyQueue,
+) func(next http.Handler) http.Handler {
+ return func(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ ctx := context.WithValue(r.Context(), legacyUserCtxKey, queue)
+ r = r.WithContext(ctx)
+ next.ServeHTTP(w, r)
+ })
+ }
+}
+
+type RepoWebhookPayload struct {
+ ID int `json:"id"`
+ Created time.Time `json:"created"`
+ Updated time.Time `json:"updated"`
+ Name string `json:"name"`
+ Description *string `json:"description"`
+ Visibility string `json:"visibility"`
+
+ Owner struct {
+ CanonicalName string `json:"canonical_name"`
+ Name string `json:"name"`
+ } `json:"owner"`
+}
+
+func DeliverLegacyRepoCreate(ctx context.Context, repo *model.Repository) {
+ q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue)
+ if !ok {
+ panic(errors.New("No legacy user webhooks worker for this context"))
+ }
+
+ payload := RepoWebhookPayload{
+ ID: repo.ID,
+ Created: repo.Created,
+ Updated: repo.Created,
+ Name: repo.Name,
+ Description: repo.Description,
+ Visibility: repo.RawVisibility,
+ }
+
+ // TODO: User groups
+ user := auth.ForContext(ctx)
+ if user.UserID != repo.OwnerID {
+ // At the time of writing, the only consumers of this function are in a
+ // context where the authenticated user is the owner of this repo. We
+ // can skip the database round-trip if we just grab their auth context.
+ panic(errors.New("TODO: look up user details for this repo"))
+ }
+ payload.Owner.CanonicalName = "~" + user.Username
+ payload.Owner.Name = user.Username
+
+ encoded, err := json.Marshal(&payload)
+ if err != nil {
+ panic(err) // Programmer error
+ }
+
+ query := sq.
+ Select().
+ From("user_webhook_subscription sub").
+ Where("sub.user_id = ?", repo.OwnerID)
+ q.Schedule(ctx, query, "user", "repo:create", encoded)
+}
+
+func DeliverLegacyRepoUpdate(ctx context.Context, repo *model.Repository) {
+ q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue)
+ if !ok {
+ panic(errors.New("No legacy user webhooks worker for this context"))
+ }
+
+ payload := RepoWebhookPayload{
+ ID: repo.ID,
+ Created: repo.Created,
+ Updated: repo.Created,
+ Name: repo.Name,
+ Description: repo.Description,
+ Visibility: repo.RawVisibility,
+ }
+
+ // TODO: User groups
+ user := auth.ForContext(ctx)
+ if user.UserID != repo.OwnerID {
+ // At the time of writing, the only consumers of this function are in a
+ // context where the authenticated user is the owner of this repo. We
+ // can skip the database round-trip if we just grab their auth context.
+ panic(errors.New("TODO: look up user details for this repo"))
+ }
+ payload.Owner.CanonicalName = "~" + user.Username
+ payload.Owner.Name = user.Username
+
+ encoded, err := json.Marshal(&payload)
+ if err != nil {
+ panic(err) // Programmer error
+ }
+
+ query := sq.
+ Select().
+ From("user_webhook_subscription sub").
+ Where("sub.user_id = ?", repo.OwnerID)
+ q.Schedule(ctx, query, "user", "repo:update", encoded)
+}
+
+func DeliverLegacyRepoDeleted(ctx context.Context, repo *model.Repository) {
+ q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue)
+ if !ok {
+ panic(errors.New("No legacy user webhooks worker for this context"))
+ }
+
+ payload := struct {
+ ID int `json:"id"`
+ }{repo.ID}
+
+ encoded, err := json.Marshal(&payload)
+ if err != nil {
+ panic(err) // Programmer error
+ }
+
+ query := sq.
+ Select().
+ From("user_webhook_subscription sub").
+ Where("sub.user_id = ?", repo.OwnerID)
+ q.Schedule(ctx, query, "user", "repo:delete", encoded)
+}
A api/webhooks/middleware.go => api/webhooks/middleware.go +25 -0
@@ 0,0 1,25 @@
+package webhooks
+
+import (
+ "context"
+ "net/http"
+
+ "git.sr.ht/~sircmpwn/core-go/webhooks"
+ "github.com/99designs/gqlgen/graphql"
+)
+
+func NewQueue(schema graphql.ExecutableSchema) *webhooks.WebhookQueue {
+ return webhooks.NewQueue(schema)
+}
+
+var webhooksCtxKey = &contextKey{"userWebhooks"}
+
+func Middleware(queue *webhooks.WebhookQueue) func(next http.Handler) http.Handler {
+ return func(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ ctx := context.WithValue(r.Context(), webhooksCtxKey, queue)
+ r = r.WithContext(ctx)
+ next.ServeHTTP(w, r)
+ })
+ }
+}
M api/webhooks/webhooks.go => api/webhooks/webhooks.go +20 -128
@@ 2,148 2,40 @@ package webhooks
import (
"context"
- "encoding/json"
- "errors"
- "net/http"
+ "log"
"time"
"git.sr.ht/~sircmpwn/core-go/auth"
"git.sr.ht/~sircmpwn/core-go/webhooks"
sq "github.com/Masterminds/squirrel"
+ "github.com/google/uuid"
"git.sr.ht/~sircmpwn/git.sr.ht/api/graph/model"
)
-func NewLegacyQueue() *webhooks.LegacyQueue {
- return webhooks.NewLegacyQueue()
-}
-
-var legacyUserCtxKey = &contextKey{"legacyUser"}
-
-type contextKey struct {
- name string
-}
-
-func LegacyMiddleware(
- queue *webhooks.LegacyQueue,
-) func(next http.Handler) http.Handler {
- return func(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- ctx := context.WithValue(r.Context(), legacyUserCtxKey, queue)
- r = r.WithContext(ctx)
- next.ServeHTTP(w, r)
- })
- }
-}
-
-type RepoWebhookPayload struct {
- ID int `json:"id"`
- Created time.Time `json:"created"`
- Updated time.Time `json:"updated"`
- Name string `json:"name"`
- Description *string `json:"description"`
- Visibility string `json:"visibility"`
-
- Owner struct {
- CanonicalName string `json:"canonical_name"`
- Name string `json:"name"`
- } `json:"owner"`
-}
-
-func DeliverLegacyRepoCreate(ctx context.Context, repo *model.Repository) {
- q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue)
- if !ok {
- panic(errors.New("No legacy user webhooks worker for this context"))
- }
-
- payload := RepoWebhookPayload{
- ID: repo.ID,
- Created: repo.Created,
- Updated: repo.Created,
- Name: repo.Name,
- Description: repo.Description,
- Visibility: repo.RawVisibility,
- }
-
- // TODO: User groups
- user := auth.ForContext(ctx)
- if user.UserID != repo.OwnerID {
- // At the time of writing, the only consumers of this function are in a
- // context where the authenticated user is the owner of this repo. We
- // can skip the database round-trip if we just grab their auth context.
- panic(errors.New("TODO: look up user details for this repo"))
- }
- payload.Owner.CanonicalName = "~" + user.Username
- payload.Owner.Name = user.Username
-
- encoded, err := json.Marshal(&payload)
- if err != nil {
- panic(err) // Programmer error
- }
-
- query := sq.
- Select().
- From("user_webhook_subscription sub").
- Where("sub.user_id = ?", repo.OwnerID)
- q.Schedule(ctx, query, "user", "repo:create", encoded)
-}
-
-func DeliverLegacyRepoUpdate(ctx context.Context, repo *model.Repository) {
- q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue)
+func deliverUserWebhook(ctx context.Context, event model.WebhookEvent,
+ payload model.WebhookPayload, payloadUUID uuid.UUID) {
+ q, ok := ctx.Value(webhooksCtxKey).(*webhooks.WebhookQueue)
if !ok {
- panic(errors.New("No legacy user webhooks worker for this context"))
+ log.Fatalf("No webhooks worker for this context")
}
-
- payload := RepoWebhookPayload{
- ID: repo.ID,
- Created: repo.Created,
- Updated: repo.Created,
- Name: repo.Name,
- Description: repo.Description,
- Visibility: repo.RawVisibility,
- }
-
- // TODO: User groups
- user := auth.ForContext(ctx)
- if user.UserID != repo.OwnerID {
- // At the time of writing, the only consumers of this function are in a
- // context where the authenticated user is the owner of this repo. We
- // can skip the database round-trip if we just grab their auth context.
- panic(errors.New("TODO: look up user details for this repo"))
- }
- payload.Owner.CanonicalName = "~" + user.Username
- payload.Owner.Name = user.Username
-
- encoded, err := json.Marshal(&payload)
- if err != nil {
- panic(err) // Programmer error
- }
-
+ userID := auth.ForContext(ctx).UserID
query := sq.
Select().
- From("user_webhook_subscription sub").
- Where("sub.user_id = ?", repo.OwnerID)
- q.Schedule(ctx, query, "user", "repo:update", encoded)
+ From("gql_user_wh_sub sub").
+ Where("sub.user_id = ?", userID)
+ q.Schedule(ctx, query, "user", event.String(),
+ payloadUUID, payload)
}
-func DeliverLegacyRepoDeleted(ctx context.Context, repo *model.Repository) {
- q, ok := ctx.Value(legacyUserCtxKey).(*webhooks.LegacyQueue)
- if !ok {
- panic(errors.New("No legacy user webhooks worker for this context"))
+func DeliverRepoEvent(ctx context.Context,
+ event model.WebhookEvent, repository *model.Repository) {
+ payloadUUID := uuid.New()
+ payload := model.RepositoryEvent{
+ UUID: payloadUUID.String(),
+ Event: event,
+ Date: time.Now().UTC(),
+ Repository: repository,
}
-
- payload := struct {
- ID int `json:"id"`
- }{repo.ID}
-
- encoded, err := json.Marshal(&payload)
- if err != nil {
- panic(err) // Programmer error
- }
-
- query := sq.
- Select().
- From("user_webhook_subscription sub").
- Where("sub.user_id = ?", repo.OwnerID)
- q.Schedule(ctx, query, "user", "repo:delete", encoded)
+ deliverUserWebhook(ctx, event, &payload, payloadUUID)
}