diff options
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 27 | ||||
-rw-r--r-- | internal/praefect/server.go | 3 | ||||
-rw-r--r-- | internal/praefect/walkrepos.go | 47 | ||||
-rw-r--r-- | internal/praefect/walkrepos_test.go | 88 |
4 files changed, 165 insertions, 0 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 77dce6ba0..4925fcb50 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -111,6 +111,8 @@ type RepositoryStore interface { MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) // MarkStorageUnverified marsk all replicas on the storage as unverified. MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) + // ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. + ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -916,3 +918,28 @@ func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositor return replicaPath, nil } + +// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. +func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) { + rows, err := rs.db.QueryContext(ctx, ` +SELECT relative_path +FROM repositories +WHERE virtual_storage = $1 +`, virtualStorage) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer rows.Close() + + var relativePaths []string + for rows.Next() { + var relativePath string + if err := rows.Scan(&relativePath); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + + relativePaths = append(relativePaths, relativePath) + } + + return relativePaths, rows.Err() +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 54cbee079..c52b160f5 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -195,6 +195,9 @@ func NewGRPCServer( "DeleteObjectPool": DeleteObjectPoolHandler(deps.RepositoryStore, deps.Logger, deps.Conns), "GetObjectPool": GetObjectPoolHandler(deps.RepositoryStore, deps.Router), }) + proxy.RegisterStreamHandlers(srv, "gitaly.InternalGitaly", map[string]grpc.StreamHandler{ + "WalkRepos": WalkReposHandler(deps.RepositoryStore), + }) } return srv diff --git a/internal/praefect/walkrepos.go b/internal/praefect/walkrepos.go new file mode 100644 index 000000000..1f321a4c7 --- /dev/null +++ b/internal/praefect/walkrepos.go @@ -0,0 +1,47 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" +) + +// WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling +// through Praefect. Instead of walking the storage directory in the filesystem, this Praefect +// implementation queries the database for all known repositories in the given virtual storage. +// As a consequence, the modification_time parameter can't be populated in the response. +func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler { + return func(srv interface{}, stream grpc.ServerStream) error { + sendRepo := func(relPath string) error { + return stream.SendMsg(&gitalypb.WalkReposResponse{ + RelativePath: relPath, + }) + } + + var req gitalypb.WalkReposRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + if req.StorageName == "" { + return structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) + } + + repos, err := rs.ListRepositoryPaths(stream.Context(), req.StorageName) + if err != nil { + return structerr.NewInternal("list repository paths: %w", err) + } + + for _, repo := range repos { + if err := sendRepo(repo); err != nil { + return structerr.NewInternal("send repository path: %w", err) + } + } + + return nil + } +} diff --git a/internal/praefect/walkrepos_test.go b/internal/praefect/walkrepos_test.go new file mode 100644 index 000000000..63301caf5 --- /dev/null +++ b/internal/praefect/walkrepos_test.go @@ -0,0 +1,88 @@ +package praefect + +import ( + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestWalkReposHandler(t *testing.T) { + t.Parallel() + + db := testdb.New(t) + for _, tc := range []struct { + desc string + request *gitalypb.WalkReposRequest + responses []*gitalypb.WalkReposResponse + expectedErr error + }{ + { + desc: "missing storage name", + request: &gitalypb.WalkReposRequest{}, + expectedErr: structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet), + }, + { + desc: "repositories found", + request: &gitalypb.WalkReposRequest{StorageName: "virtual-storage"}, + responses: []*gitalypb.WalkReposResponse{ + {RelativePath: "relative-path"}, + {RelativePath: "relative-path-2"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + db.TruncateAll(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + ctx := testhelper.Context(t) + + require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path-2", "relative-path-2", "storage", nil, nil, false, false)) + + tmp := testhelper.TempDir(t) + + ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) + require.NoError(t, err) + + srv := NewGRPCServer(&Dependencies{ + Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, + Logger: testhelper.SharedLogger(t), + RepositoryStore: rs, + Registry: protoregistry.GitalyProtoPreregistered, + }, nil) + defer srv.Stop() + + go testhelper.MustServe(t, srv, ln) + + clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer testhelper.MustClose(t, clientConn) + + client := gitalypb.NewInternalGitalyClient(clientConn) + + stream, err := client.WalkRepos(ctx, tc.request) + if tc.expectedErr != nil { + // Consume the first message and test for errors only if we're expecting an error. + _, err = stream.Recv() + testhelper.RequireGrpcError(t, tc.expectedErr, err) + return + } + require.NoError(t, err) + + actualRepos, err := testhelper.Receive(stream.Recv) + require.NoError(t, err) + testhelper.ProtoEqual(t, tc.responses, actualRepos) + }) + } +} |