Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/praefect/datastore/repository_store.go27
-rw-r--r--internal/praefect/server.go3
-rw-r--r--internal/praefect/walkrepos.go47
-rw-r--r--internal/praefect/walkrepos_test.go88
4 files changed, 165 insertions, 0 deletions
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 77dce6ba0..4925fcb50 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -111,6 +111,8 @@ type RepositoryStore interface {
MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error)
// MarkStorageUnverified marsk all replicas on the storage as unverified.
MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error)
+ // ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
+ ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error)
}
// PostgresRepositoryStore is a Postgres implementation of RepositoryStore.
@@ -916,3 +918,28 @@ func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositor
return replicaPath, nil
}
+
+// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage.
+func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) {
+ rows, err := rs.db.QueryContext(ctx, `
+SELECT relative_path
+FROM repositories
+WHERE virtual_storage = $1
+`, virtualStorage)
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer rows.Close()
+
+ var relativePaths []string
+ for rows.Next() {
+ var relativePath string
+ if err := rows.Scan(&relativePath); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ relativePaths = append(relativePaths, relativePath)
+ }
+
+ return relativePaths, rows.Err()
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 54cbee079..c52b160f5 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -195,6 +195,9 @@ func NewGRPCServer(
"DeleteObjectPool": DeleteObjectPoolHandler(deps.RepositoryStore, deps.Logger, deps.Conns),
"GetObjectPool": GetObjectPoolHandler(deps.RepositoryStore, deps.Router),
})
+ proxy.RegisterStreamHandlers(srv, "gitaly.InternalGitaly", map[string]grpc.StreamHandler{
+ "WalkRepos": WalkReposHandler(deps.RepositoryStore),
+ })
}
return srv
diff --git a/internal/praefect/walkrepos.go b/internal/praefect/walkrepos.go
new file mode 100644
index 000000000..1f321a4c7
--- /dev/null
+++ b/internal/praefect/walkrepos.go
@@ -0,0 +1,47 @@
+package praefect
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+// WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling
+// through Praefect. Instead of walking the storage directory in the filesystem, this Praefect
+// implementation queries the database for all known repositories in the given virtual storage.
+// As a consequence, the modification_time parameter can't be populated in the response.
+func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler {
+ return func(srv interface{}, stream grpc.ServerStream) error {
+ sendRepo := func(relPath string) error {
+ return stream.SendMsg(&gitalypb.WalkReposResponse{
+ RelativePath: relPath,
+ })
+ }
+
+ var req gitalypb.WalkReposRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ if req.StorageName == "" {
+ return structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet)
+ }
+
+ repos, err := rs.ListRepositoryPaths(stream.Context(), req.StorageName)
+ if err != nil {
+ return structerr.NewInternal("list repository paths: %w", err)
+ }
+
+ for _, repo := range repos {
+ if err := sendRepo(repo); err != nil {
+ return structerr.NewInternal("send repository path: %w", err)
+ }
+ }
+
+ return nil
+ }
+}
diff --git a/internal/praefect/walkrepos_test.go b/internal/praefect/walkrepos_test.go
new file mode 100644
index 000000000..63301caf5
--- /dev/null
+++ b/internal/praefect/walkrepos_test.go
@@ -0,0 +1,88 @@
+package praefect
+
+import (
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestWalkReposHandler(t *testing.T) {
+ t.Parallel()
+
+ db := testdb.New(t)
+ for _, tc := range []struct {
+ desc string
+ request *gitalypb.WalkReposRequest
+ responses []*gitalypb.WalkReposResponse
+ expectedErr error
+ }{
+ {
+ desc: "missing storage name",
+ request: &gitalypb.WalkReposRequest{},
+ expectedErr: structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet),
+ },
+ {
+ desc: "repositories found",
+ request: &gitalypb.WalkReposRequest{StorageName: "virtual-storage"},
+ responses: []*gitalypb.WalkReposResponse{
+ {RelativePath: "relative-path"},
+ {RelativePath: "relative-path-2"},
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ db.TruncateAll(t)
+ rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}})
+ ctx := testhelper.Context(t)
+
+ require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false))
+ require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path-2", "relative-path-2", "storage", nil, nil, false, false))
+
+ tmp := testhelper.TempDir(t)
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
+ require.NoError(t, err)
+
+ srv := NewGRPCServer(&Dependencies{
+ Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}},
+ Logger: testhelper.SharedLogger(t),
+ RepositoryStore: rs,
+ Registry: protoregistry.GitalyProtoPreregistered,
+ }, nil)
+ defer srv.Stop()
+
+ go testhelper.MustServe(t, srv, ln)
+
+ clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, clientConn)
+
+ client := gitalypb.NewInternalGitalyClient(clientConn)
+
+ stream, err := client.WalkRepos(ctx, tc.request)
+ if tc.expectedErr != nil {
+ // Consume the first message and test for errors only if we're expecting an error.
+ _, err = stream.Recv()
+ testhelper.RequireGrpcError(t, tc.expectedErr, err)
+ return
+ }
+ require.NoError(t, err)
+
+ actualRepos, err := testhelper.Receive(stream.Recv)
+ require.NoError(t, err)
+ testhelper.ProtoEqual(t, tc.responses, actualRepos)
+ })
+ }
+}