Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-12-18 09:49:48 +0300
committerJames Liu <jliu@gitlab.com>2024-01-17 01:13:13 +0300
commite1f6da778cae5991314762474607b5bf3b228a9a (patch)
tree59f643dc13e9a38677f47a906cc505bbf418799f
parenta9b8db1b0894b1d613c77b2736409563199d5e62 (diff)
praefect: Intercept WalkRepos RPC
Adds a handler to Praefect to intercept calls to the WalkRepos RPC. The handler provides an alternate implementation of listing repositories in a storage, which queries the Praefect DB rather than walking the filesystem on disk. This is required so when the RPC is invoked via Praefect, the DB is used as the source of truth rather than a random Gitaly node. The only user-facing difference between this and the original implementation is that the `modification_time` attribute of the response message is left empty, as this cannot be determined via the DB.
-rw-r--r--internal/praefect/datastore/repository_store.go27
-rw-r--r--internal/praefect/server.go3
-rw-r--r--internal/praefect/walkrepos.go47
-rw-r--r--internal/praefect/walkrepos_test.go88
4 files changed, 165 insertions, 0 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 77dce6ba0..4925fcb50 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -111,6 +111,8 @@ type RepositoryStore interface {
MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error)
// MarkStorageUnverified marsk all replicas on the storage as unverified.
MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error)
+ // ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
+ ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error)
}
// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
@@ -916,3 +918,28 @@ func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositor
return replicaPath, nil
}
+
+// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
+func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) {
+ rows, err := rs.db.QueryContext(ctx, `
+SELECT relative_path
+FROM repositories
+WHERE virtual_storage = $1
+`, virtualStorage)
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer rows.Close()
+
+ var relativePaths []string
+ for rows.Next() {
+ var relativePath string
+ if err := rows.Scan(&relativePath); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ relativePaths = append(relativePaths, relativePath)
+ }
+
+ return relativePaths, rows.Err()
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 54cbee079..c52b160f5 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -195,6 +195,9 @@ func NewGRPCServer(
"DeleteObjectPool": DeleteObjectPoolHandler(deps.RepositoryStore, deps.Logger, deps.Conns),
"GetObjectPool": GetObjectPoolHandler(deps.RepositoryStore, deps.Router),
})
+ proxy.RegisterStreamHandlers(srv, "gitaly.InternalGitaly", map[string]grpc.StreamHandler{
+ "WalkRepos": WalkReposHandler(deps.RepositoryStore),
+ })
}
return srv
diff --git a/internal/praefect/walkrepos.go b/internal/praefect/walkrepos.go
new file mode 100644
index 000000000..1f321a4c7
--- /dev/null
+++ b/internal/praefect/walkrepos.go
@@ -0,0 +1,47 @@
+package praefect
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+// WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling
+// through Praefect. Instead of walking the storage directory in the filesystem, this Praefect
+// implementation queries the database for all known repositories in the given virtual storage.
+// As a consequence, the modification_time parameter can't be populated in the response.
+func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler {
+ return func(srv interface{}, stream grpc.ServerStream) error {
+ sendRepo := func(relPath string) error {
+ return stream.SendMsg(&gitalypb.WalkReposResponse{
+ RelativePath: relPath,
+ })
+ }
+
+ var req gitalypb.WalkReposRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ if req.StorageName == "" {
+ return structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet)
+ }
+
+ repos, err := rs.ListRepositoryPaths(stream.Context(), req.StorageName)
+ if err != nil {
+ return structerr.NewInternal("list repository paths: %w", err)
+ }
+
+ for _, repo := range repos {
+ if err := sendRepo(repo); err != nil {
+ return structerr.NewInternal("send repository path: %w", err)
+ }
+ }
+
+ return nil
+ }
+}
diff --git a/internal/praefect/walkrepos_test.go b/internal/praefect/walkrepos_test.go
new file mode 100644
index 000000000..63301caf5
--- /dev/null
+++ b/internal/praefect/walkrepos_test.go
@@ -0,0 +1,88 @@
+package praefect
+
+import (
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestWalkReposHandler(t *testing.T) {
+ t.Parallel()
+
+ db := testdb.New(t)
+ for _, tc := range []struct {
+ desc string
+ request *gitalypb.WalkReposRequest
+ responses []*gitalypb.WalkReposResponse
+ expectedErr error
+ }{
+ {
+ desc: "missing storage name",
+ request: &gitalypb.WalkReposRequest{},
+ expectedErr: structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet),
+ },
+ {
+ desc: "repositories found",
+ request: &gitalypb.WalkReposRequest{StorageName: "virtual-storage"},
+ responses: []*gitalypb.WalkReposResponse{
+ {RelativePath: "relative-path"},
+ {RelativePath: "relative-path-2"},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ db.TruncateAll(t)
+ rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}})
+ ctx := testhelper.Context(t)
+
+ require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path-2", "relative-path-2", "storage", nil, nil, false, false))
+
+ tmp := testhelper.TempDir(t)
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
+ require.NoError(t, err)
+
+ srv := NewGRPCServer(&Dependencies{
+ Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}},
+ Logger: testhelper.SharedLogger(t),
+ RepositoryStore: rs,
+ Registry: protoregistry.GitalyProtoPreregistered,
+ }, nil)
+ defer srv.Stop()
+
+ go testhelper.MustServe(t, srv, ln)
+
+ clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, clientConn)
+
+ client := gitalypb.NewInternalGitalyClient(clientConn)
+
+ stream, err := client.WalkRepos(ctx, tc.request)
+ if tc.expectedErr != nil {
+ // Consume the first message and test for errors only if we're expecting an error.
+ _, err = stream.Recv()
+ testhelper.RequireGrpcError(t, tc.expectedErr, err)
+ return
+ }
+ require.NoError(t, err)
+
+ actualRepos, err := testhelper.Receive(stream.Recv)
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, tc.responses, actualRepos)
+ })
+ }
+}