From 32db8e4840a9bd4172cfce53f1f2b2fb2bc76699 Mon Sep 17 00:00:00 2001 From: Drew DeVault Date: Mon, 13 Apr 2020 14:51:45 -0400 Subject: [PATCH] api: implement repositoryByOwner, repositoryByName --- api/graph/schema.resolvers.go | 11 +- api/loaders/middleware.go | 114 ++++++++- api/loaders/repositoriesbynameloader_gen.go | 224 ++++++++++++++++++ .../repositoriesbyownerreponameloader_gen.go | 224 ++++++++++++++++++ 4 files changed, 567 insertions(+), 6 deletions(-) create mode 100644 api/loaders/repositoriesbynameloader_gen.go create mode 100644 api/loaders/repositoriesbyownerreponameloader_gen.go diff --git a/api/graph/schema.resolvers.go b/api/graph/schema.resolvers.go index 87f4f2e..2048db2 100644 --- a/api/graph/schema.resolvers.go +++ b/api/graph/schema.resolvers.go @@ -8,6 +8,7 @@ import ( "database/sql" "fmt" "sort" + "strings" "git.sr.ht/~sircmpwn/git.sr.ht/api/auth" "git.sr.ht/~sircmpwn/git.sr.ht/api/graph/generated" @@ -81,11 +82,17 @@ func (r *queryResolver) Repository(ctx context.Context, id int) (*model.Reposito } func (r *queryResolver) RepositoryByName(ctx context.Context, name string) (*model.Repository, error) { - panic(fmt.Errorf("not implemented")) + return loaders.ForContext(ctx).RepositoriesByName.Load(name) } func (r *queryResolver) RepositoryByOwner(ctx context.Context, owner string, repo string) (*model.Repository, error) { - panic(fmt.Errorf("not implemented")) + if strings.HasPrefix(owner, "~") { + owner = owner[1:] + } else { + return nil, fmt.Errorf("Expected owner to be a canonical name") + } + return loaders.ForContext(ctx). + RepositoriesByOwnerRepoName.Load([2]string{owner, repo}) } func (r *repositoryResolver) Owner(ctx context.Context, obj *model.Repository) (model.Entity, error) { diff --git a/api/loaders/middleware.go b/api/loaders/middleware.go index 75aa823..2fa434c 100644 --- a/api/loaders/middleware.go +++ b/api/loaders/middleware.go @@ -1,8 +1,10 @@ package loaders //go:generate ./gen RepositoriesByIDLoader int api/graph/model.Repository -//go:generate ./gen UsersByNameLoader string api/graph/model.User +//go:generate ./gen RepositoriesByNameLoader string api/graph/model.Repository +//go:generate ./gen RepositoriesByOwnerRepoNameLoader [2]string api/graph/model.Repository //go:generate ./gen UsersByIDLoader int api/graph/model.User +//go:generate ./gen UsersByNameLoader string api/graph/model.User import ( "context" @@ -23,9 +25,11 @@ type contextKey struct { } type Loaders struct { - UsersByID UsersByIDLoader - UsersByName UsersByNameLoader - RepositoriesByID RepositoriesByIDLoader + UsersByID UsersByIDLoader + UsersByName UsersByNameLoader + RepositoriesByID RepositoriesByIDLoader + RepositoriesByName RepositoriesByNameLoader + RepositoriesByOwnerRepoName RepositoriesByOwnerRepoNameLoader } func fetchUsersByID(ctx context.Context, @@ -143,6 +147,98 @@ func fetchRepositoriesByID(ctx context.Context, } } +func fetchRepositoriesByName(ctx context.Context, + db *sql.DB) func (names []string) ([]*model.Repository, []error) { + return func (names []string) ([]*model.Repository, []error) { + var ( + err error + rows *sql.Rows + ) + if rows, err = db.QueryContext(ctx, ` + SELECT DISTINCT `+(&model.Repository{}).Columns(ctx, "repo")+` + FROM repository repo + WHERE repo.name = ANY($2) AND repo.owner_id = $1 + `, auth.ForContext(ctx).ID, pq.Array(names)); err != nil { + panic(err) + } + defer rows.Close() + + reposByName := map[string]*model.Repository{} + for rows.Next() { + repo := model.Repository{} + if err := rows.Scan(repo.Fields(ctx)...); err != nil { + panic(err) + } + reposByName[repo.Name] = &repo + } + if err = rows.Err(); err != nil { + panic(err) + } + + repos := make([]*model.Repository, len(names)) + for i, name := range names { + repos[i] = reposByName[name] + } + + return repos, nil + } +} + +func fetchRepositoriesByOwnerRepoName(ctx context.Context, + db *sql.DB) func (names [][2]string) ([]*model.Repository, []error) { + return func (names [][2]string) ([]*model.Repository, []error) { + var ( + err error + rows *sql.Rows + _names []string = make([]string, len(names)) + ) + for i, name := range names { + // This is a hack, but it works around limitations with PostgreSQL + // and is guaranteed to work because / is invalid in both usernames + // and repo names + _names[i] = name[0] + "/" + name[1] + } + if rows, err = db.QueryContext(ctx, ` + SELECT DISTINCT `+(&model.Repository{}).Columns(ctx, "repo")+`, + u.username + FROM repository repo + JOIN + "user" u ON repo.owner_id = u.id + FULL OUTER JOIN + access ON repo.id = access.repo_id + WHERE + u.username || '/' || repo.name = ANY($2) + AND (access.user_id = $1 + OR repo.owner_id = $1 + OR repo.visibility != 'private') + `, auth.ForContext(ctx).ID, pq.Array(_names)); err != nil { + panic(err) + } + defer rows.Close() + + reposByOwnerRepoName := map[[2]string]*model.Repository{} + for rows.Next() { + var ownerName string + repo := model.Repository{} + if err := rows.Scan(append( + repo.Fields(ctx), &ownerName)...); err != nil { + panic(err) + } + reposByOwnerRepoName[[2]string{ownerName, repo.Name}] = &repo + } + if err = rows.Err(); err != nil { + panic(err) + } + + repos := make([]*model.Repository, len(names)) + for i, name := range names { + repos[i] = reposByOwnerRepoName[name] + } + + return repos, nil + } +} + func Middleware(db *sql.DB) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -162,6 +258,16 @@ func Middleware(db *sql.DB) func(http.Handler) http.Handler { wait: 1 * time.Millisecond, fetch: fetchRepositoriesByID(r.Context(), db), }, + RepositoriesByName: RepositoriesByNameLoader{ + maxBatch: 100, + wait: 1 * time.Millisecond, + fetch: fetchRepositoriesByName(r.Context(), db), + }, + RepositoriesByOwnerRepoName: RepositoriesByOwnerRepoNameLoader{ + maxBatch: 100, + wait: 1 * time.Millisecond, + fetch: fetchRepositoriesByOwnerRepoName(r.Context(), db), + }, }) r = r.WithContext(ctx) next.ServeHTTP(w, r) diff --git a/api/loaders/repositoriesbynameloader_gen.go b/api/loaders/repositoriesbynameloader_gen.go new file mode 100644 index 0000000..78211c3 --- /dev/null +++ b/api/loaders/repositoriesbynameloader_gen.go @@ -0,0 +1,224 @@ +// Code generated by github.com/vektah/dataloaden, DO NOT EDIT. + +package loaders + +import ( + "sync" + "time" + + "git.sr.ht/~sircmpwn/git.sr.ht/api/graph/model" +) + +// RepositoriesByNameLoaderConfig captures the config to create a new RepositoriesByNameLoader +type RepositoriesByNameLoaderConfig struct { + // Fetch is a method that provides the data for the loader + Fetch func(keys []string) ([]*model.Repository, []error) + + // Wait is how long wait before sending a batch + Wait time.Duration + + // MaxBatch will limit the maximum number of keys to send in one batch, 0 = not limit + MaxBatch int +} + +// NewRepositoriesByNameLoader creates a new RepositoriesByNameLoader given a fetch, wait, and maxBatch +func NewRepositoriesByNameLoader(config RepositoriesByNameLoaderConfig) *RepositoriesByNameLoader { + return &RepositoriesByNameLoader{ + fetch: config.Fetch, + wait: config.Wait, + maxBatch: config.MaxBatch, + } +} + +// RepositoriesByNameLoader batches and caches requests +type RepositoriesByNameLoader struct { + // this method provides the data for the loader + fetch func(keys []string) ([]*model.Repository, []error) + + // how long to done before sending a batch + wait time.Duration + + // this will limit the maximum number of keys to send in one batch, 0 = no limit + maxBatch int + + // INTERNAL + + // lazily created cache + cache map[string]*model.Repository + + // the current batch. keys will continue to be collected until timeout is hit, + // then everything will be sent to the fetch method and out to the listeners + batch *repositoriesByNameLoaderBatch + + // mutex to prevent races + mu sync.Mutex +} + +type repositoriesByNameLoaderBatch struct { + keys []string + data []*model.Repository + error []error + closing bool + done chan struct{} +} + +// Load a Repository by key, batching and caching will be applied automatically +func (l *RepositoriesByNameLoader) Load(key string) (*model.Repository, error) { + return l.LoadThunk(key)() +} + +// LoadThunk returns a function that when called will block waiting for a Repository. +// This method should be used if you want one goroutine to make requests to many +// different data loaders without blocking until the thunk is called. +func (l *RepositoriesByNameLoader) LoadThunk(key string) func() (*model.Repository, error) { + l.mu.Lock() + if it, ok := l.cache[key]; ok { + l.mu.Unlock() + return func() (*model.Repository, error) { + return it, nil + } + } + if l.batch == nil { + l.batch = &repositoriesByNameLoaderBatch{done: make(chan struct{})} + } + batch := l.batch + pos := batch.keyIndex(l, key) + l.mu.Unlock() + + return func() (*model.Repository, error) { + <-batch.done + + var data *model.Repository + if pos < len(batch.data) { + data = batch.data[pos] + } + + var err error + // its convenient to be able to return a single error for everything + if len(batch.error) == 1 { + err = batch.error[0] + } else if batch.error != nil { + err = batch.error[pos] + } + + if err == nil { + l.mu.Lock() + l.unsafeSet(key, data) + l.mu.Unlock() + } + + return data, err + } +} + +// LoadAll fetches many keys at once. It will be broken into appropriate sized +// sub batches depending on how the loader is configured +func (l *RepositoriesByNameLoader) LoadAll(keys []string) ([]*model.Repository, []error) { + results := make([]func() (*model.Repository, error), len(keys)) + + for i, key := range keys { + results[i] = l.LoadThunk(key) + } + + repositorys := make([]*model.Repository, len(keys)) + errors := make([]error, len(keys)) + for i, thunk := range results { + repositorys[i], errors[i] = thunk() + } + return repositorys, errors +} + +// LoadAllThunk returns a function that when called will block waiting for a Repositorys. +// This method should be used if you want one goroutine to make requests to many +// different data loaders without blocking until the thunk is called. +func (l *RepositoriesByNameLoader) LoadAllThunk(keys []string) func() ([]*model.Repository, []error) { + results := make([]func() (*model.Repository, error), len(keys)) + for i, key := range keys { + results[i] = l.LoadThunk(key) + } + return func() ([]*model.Repository, []error) { + repositorys := make([]*model.Repository, len(keys)) + errors := make([]error, len(keys)) + for i, thunk := range results { + repositorys[i], errors[i] = thunk() + } + return repositorys, errors + } +} + +// Prime the cache with the provided key and value. If the key already exists, no change is made +// and false is returned. +// (To forcefully prime the cache, clear the key first with loader.clear(key).prime(key, value).) +func (l *RepositoriesByNameLoader) Prime(key string, value *model.Repository) bool { + l.mu.Lock() + var found bool + if _, found = l.cache[key]; !found { + // make a copy when writing to the cache, its easy to pass a pointer in from a loop var + // and end up with the whole cache pointing to the same value. + cpy := *value + l.unsafeSet(key, &cpy) + } + l.mu.Unlock() + return !found +} + +// Clear the value at key from the cache, if it exists +func (l *RepositoriesByNameLoader) Clear(key string) { + l.mu.Lock() + delete(l.cache, key) + l.mu.Unlock() +} + +func (l *RepositoriesByNameLoader) unsafeSet(key string, value *model.Repository) { + if l.cache == nil { + l.cache = map[string]*model.Repository{} + } + l.cache[key] = value +} + +// keyIndex will return the location of the key in the batch, if its not found +// it will add the key to the batch +func (b *repositoriesByNameLoaderBatch) keyIndex(l *RepositoriesByNameLoader, key string) int { + for i, existingKey := range b.keys { + if key == existingKey { + return i + } + } + + pos := len(b.keys) + b.keys = append(b.keys, key) + if pos == 0 { + go b.startTimer(l) + } + + if l.maxBatch != 0 && pos >= l.maxBatch-1 { + if !b.closing { + b.closing = true + l.batch = nil + go b.end(l) + } + } + + return pos +} + +func (b *repositoriesByNameLoaderBatch) startTimer(l *RepositoriesByNameLoader) { + time.Sleep(l.wait) + l.mu.Lock() + + // we must have hit a batch limit and are already finalizing this batch + if b.closing { + l.mu.Unlock() + return + } + + l.batch = nil + l.mu.Unlock() + + b.end(l) +} + +func (b *repositoriesByNameLoaderBatch) end(l *RepositoriesByNameLoader) { + b.data, b.error = l.fetch(b.keys) + close(b.done) +} diff --git a/api/loaders/repositoriesbyownerreponameloader_gen.go b/api/loaders/repositoriesbyownerreponameloader_gen.go new file mode 100644 index 0000000..42cac34 --- /dev/null +++ b/api/loaders/repositoriesbyownerreponameloader_gen.go @@ -0,0 +1,224 @@ +// Code generated by github.com/vektah/dataloaden, DO NOT EDIT. + +package loaders + +import ( + "sync" + "time" + + "git.sr.ht/~sircmpwn/git.sr.ht/api/graph/model" +) + +// RepositoriesByOwnerRepoNameLoaderConfig captures the config to create a new RepositoriesByOwnerRepoNameLoader +type RepositoriesByOwnerRepoNameLoaderConfig struct { + // Fetch is a method that provides the data for the loader + Fetch func(keys [][2]string) ([]*model.Repository, []error) + + // Wait is how long wait before sending a batch + Wait time.Duration + + // MaxBatch will limit the maximum number of keys to send in one batch, 0 = not limit + MaxBatch int +} + +// NewRepositoriesByOwnerRepoNameLoader creates a new RepositoriesByOwnerRepoNameLoader given a fetch, wait, and maxBatch +func NewRepositoriesByOwnerRepoNameLoader(config RepositoriesByOwnerRepoNameLoaderConfig) *RepositoriesByOwnerRepoNameLoader { + return &RepositoriesByOwnerRepoNameLoader{ + fetch: config.Fetch, + wait: config.Wait, + maxBatch: config.MaxBatch, + } +} + +// RepositoriesByOwnerRepoNameLoader batches and caches requests +type RepositoriesByOwnerRepoNameLoader struct { + // this method provides the data for the loader + fetch func(keys [][2]string) ([]*model.Repository, []error) + + // how long to done before sending a batch + wait time.Duration + + // this will limit the maximum number of keys to send in one batch, 0 = no limit + maxBatch int + + // INTERNAL + + // lazily created cache + cache map[[2]string]*model.Repository + + // the current batch. keys will continue to be collected until timeout is hit, + // then everything will be sent to the fetch method and out to the listeners + batch *repositoriesByOwnerRepoNameLoaderBatch + + // mutex to prevent races + mu sync.Mutex +} + +type repositoriesByOwnerRepoNameLoaderBatch struct { + keys [][2]string + data []*model.Repository + error []error + closing bool + done chan struct{} +} + +// Load a Repository by key, batching and caching will be applied automatically +func (l *RepositoriesByOwnerRepoNameLoader) Load(key [2]string) (*model.Repository, error) { + return l.LoadThunk(key)() +} + +// LoadThunk returns a function that when called will block waiting for a Repository. +// This method should be used if you want one goroutine to make requests to many +// different data loaders without blocking until the thunk is called. +func (l *RepositoriesByOwnerRepoNameLoader) LoadThunk(key [2]string) func() (*model.Repository, error) { + l.mu.Lock() + if it, ok := l.cache[key]; ok { + l.mu.Unlock() + return func() (*model.Repository, error) { + return it, nil + } + } + if l.batch == nil { + l.batch = &repositoriesByOwnerRepoNameLoaderBatch{done: make(chan struct{})} + } + batch := l.batch + pos := batch.keyIndex(l, key) + l.mu.Unlock() + + return func() (*model.Repository, error) { + <-batch.done + + var data *model.Repository + if pos < len(batch.data) { + data = batch.data[pos] + } + + var err error + // its convenient to be able to return a single error for everything + if len(batch.error) == 1 { + err = batch.error[0] + } else if batch.error != nil { + err = batch.error[pos] + } + + if err == nil { + l.mu.Lock() + l.unsafeSet(key, data) + l.mu.Unlock() + } + + return data, err + } +} + +// LoadAll fetches many keys at once. It will be broken into appropriate sized +// sub batches depending on how the loader is configured +func (l *RepositoriesByOwnerRepoNameLoader) LoadAll(keys [][2]string) ([]*model.Repository, []error) { + results := make([]func() (*model.Repository, error), len(keys)) + + for i, key := range keys { + results[i] = l.LoadThunk(key) + } + + repositorys := make([]*model.Repository, len(keys)) + errors := make([]error, len(keys)) + for i, thunk := range results { + repositorys[i], errors[i] = thunk() + } + return repositorys, errors +} + +// LoadAllThunk returns a function that when called will block waiting for a Repositorys. +// This method should be used if you want one goroutine to make requests to many +// different data loaders without blocking until the thunk is called. +func (l *RepositoriesByOwnerRepoNameLoader) LoadAllThunk(keys [][2]string) func() ([]*model.Repository, []error) { + results := make([]func() (*model.Repository, error), len(keys)) + for i, key := range keys { + results[i] = l.LoadThunk(key) + } + return func() ([]*model.Repository, []error) { + repositorys := make([]*model.Repository, len(keys)) + errors := make([]error, len(keys)) + for i, thunk := range results { + repositorys[i], errors[i] = thunk() + } + return repositorys, errors + } +} + +// Prime the cache with the provided key and value. If the key already exists, no change is made +// and false is returned. +// (To forcefully prime the cache, clear the key first with loader.clear(key).prime(key, value).) +func (l *RepositoriesByOwnerRepoNameLoader) Prime(key [2]string, value *model.Repository) bool { + l.mu.Lock() + var found bool + if _, found = l.cache[key]; !found { + // make a copy when writing to the cache, its easy to pass a pointer in from a loop var + // and end up with the whole cache pointing to the same value. + cpy := *value + l.unsafeSet(key, &cpy) + } + l.mu.Unlock() + return !found +} + +// Clear the value at key from the cache, if it exists +func (l *RepositoriesByOwnerRepoNameLoader) Clear(key [2]string) { + l.mu.Lock() + delete(l.cache, key) + l.mu.Unlock() +} + +func (l *RepositoriesByOwnerRepoNameLoader) unsafeSet(key [2]string, value *model.Repository) { + if l.cache == nil { + l.cache = map[[2]string]*model.Repository{} + } + l.cache[key] = value +} + +// keyIndex will return the location of the key in the batch, if its not found +// it will add the key to the batch +func (b *repositoriesByOwnerRepoNameLoaderBatch) keyIndex(l *RepositoriesByOwnerRepoNameLoader, key [2]string) int { + for i, existingKey := range b.keys { + if key == existingKey { + return i + } + } + + pos := len(b.keys) + b.keys = append(b.keys, key) + if pos == 0 { + go b.startTimer(l) + } + + if l.maxBatch != 0 && pos >= l.maxBatch-1 { + if !b.closing { + b.closing = true + l.batch = nil + go b.end(l) + } + } + + return pos +} + +func (b *repositoriesByOwnerRepoNameLoaderBatch) startTimer(l *RepositoriesByOwnerRepoNameLoader) { + time.Sleep(l.wait) + l.mu.Lock() + + // we must have hit a batch limit and are already finalizing this batch + if b.closing { + l.mu.Unlock() + return + } + + l.batch = nil + l.mu.Unlock() + + b.end(l) +} + +func (b *repositoriesByOwnerRepoNameLoaderBatch) end(l *RepositoriesByOwnerRepoNameLoader) { + b.data, b.error = l.fetch(b.keys) + close(b.done) +} -- 2.38.4