From e1f6da778cae5991314762474607b5bf3b228a9a Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 18 Dec 2023 17:49:48 +1100 Subject: praefect: Intercept WalkRepos RPC Adds a handler to Praefect to intercept calls to the WalkRepos RPC. The handler provides an alternate implementation of listing repositories in a storage, which queries the Praefect DB rather than walking the filesystem on disk. This is required so when the RPC is invoked via Praefect, the DB is used as the source of truth rather than a random Gitaly node. The only user-facing difference between this and the original implementation is that the `modification_time` attribute of the response message is left empty, as this cannot be determined via the DB. --- internal/praefect/datastore/repository_store.go | 27 ++++++++ internal/praefect/server.go | 3 + internal/praefect/walkrepos.go | 47 +++++++++++++ internal/praefect/walkrepos_test.go | 88 +++++++++++++++++++++++++ 4 files changed, 165 insertions(+) create mode 100644 internal/praefect/walkrepos.go create mode 100644 internal/praefect/walkrepos_test.go diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 77dce6ba0..4925fcb50 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -111,6 +111,8 @@ type RepositoryStore interface { MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) // MarkStorageUnverified marsk all replicas on the storage as unverified. MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) + // ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. + ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -916,3 +918,28 @@ func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositor return replicaPath, nil } + +// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. +func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) { + rows, err := rs.db.QueryContext(ctx, ` +SELECT relative_path +FROM repositories +WHERE virtual_storage = $1 +`, virtualStorage) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer rows.Close() + + var relativePaths []string + for rows.Next() { + var relativePath string + if err := rows.Scan(&relativePath); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + + relativePaths = append(relativePaths, relativePath) + } + + return relativePaths, rows.Err() +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 54cbee079..c52b160f5 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -195,6 +195,9 @@ func NewGRPCServer( "DeleteObjectPool": DeleteObjectPoolHandler(deps.RepositoryStore, deps.Logger, deps.Conns), "GetObjectPool": GetObjectPoolHandler(deps.RepositoryStore, deps.Router), }) + proxy.RegisterStreamHandlers(srv, "gitaly.InternalGitaly", map[string]grpc.StreamHandler{ + "WalkRepos": WalkReposHandler(deps.RepositoryStore), + }) } return srv diff --git a/internal/praefect/walkrepos.go b/internal/praefect/walkrepos.go new file mode 100644 index 000000000..1f321a4c7 --- /dev/null +++ b/internal/praefect/walkrepos.go @@ -0,0 +1,47 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" +) + +// WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling +// through Praefect. Instead of walking the storage directory in the filesystem, this Praefect +// implementation queries the database for all known repositories in the given virtual storage. +// As a consequence, the modification_time parameter can't be populated in the response. +func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler { + return func(srv interface{}, stream grpc.ServerStream) error { + sendRepo := func(relPath string) error { + return stream.SendMsg(&gitalypb.WalkReposResponse{ + RelativePath: relPath, + }) + } + + var req gitalypb.WalkReposRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + if req.StorageName == "" { + return structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) + } + + repos, err := rs.ListRepositoryPaths(stream.Context(), req.StorageName) + if err != nil { + return structerr.NewInternal("list repository paths: %w", err) + } + + for _, repo := range repos { + if err := sendRepo(repo); err != nil { + return structerr.NewInternal("send repository path: %w", err) + } + } + + return nil + } +} diff --git a/internal/praefect/walkrepos_test.go b/internal/praefect/walkrepos_test.go new file mode 100644 index 000000000..63301caf5 --- /dev/null +++ b/internal/praefect/walkrepos_test.go @@ -0,0 +1,88 @@ +package praefect + +import ( + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestWalkReposHandler(t *testing.T) { + t.Parallel() + + db := testdb.New(t) + for _, tc := range []struct { + desc string + request *gitalypb.WalkReposRequest + responses []*gitalypb.WalkReposResponse + expectedErr error + }{ + { + desc: "missing storage name", + request: &gitalypb.WalkReposRequest{}, + expectedErr: structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet), + }, + { + desc: "repositories found", + request: &gitalypb.WalkReposRequest{StorageName: "virtual-storage"}, + responses: []*gitalypb.WalkReposResponse{ + {RelativePath: "relative-path"}, + {RelativePath: "relative-path-2"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + db.TruncateAll(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + ctx := testhelper.Context(t) + + require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path-2", "relative-path-2", "storage", nil, nil, false, false)) + + tmp := testhelper.TempDir(t) + + ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) + require.NoError(t, err) + + srv := NewGRPCServer(&Dependencies{ + Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, + Logger: testhelper.SharedLogger(t), + RepositoryStore: rs, + Registry: protoregistry.GitalyProtoPreregistered, + }, nil) + defer srv.Stop() + + go testhelper.MustServe(t, srv, ln) + + clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer testhelper.MustClose(t, clientConn) + + client := gitalypb.NewInternalGitalyClient(clientConn) + + stream, err := client.WalkRepos(ctx, tc.request) + if tc.expectedErr != nil { + // Consume the first message and test for errors only if we're expecting an error. + _, err = stream.Recv() + testhelper.RequireGrpcError(t, tc.expectedErr, err) + return + } + require.NoError(t, err) + + actualRepos, err := testhelper.Receive(stream.Recv) + require.NoError(t, err) + testhelper.ProtoEqual(t, tc.responses, actualRepos) + }) + } +} -- cgit v1.2.3