Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-06-14 23:52:07 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-06-30 00:04:22 +0300
commit2c8e1902f6c15dbb7d37ad64a02a1b756fbbffc6 (patch)
tree108aaf3992ba7dec2ca2e8b38beb59402e744de0
parent7ea601690c1d278fdf26cd7b3bb911abb7cb3411 (diff)
praefect: Introduce `GetObjectPoolHandler()`
The `GetObjectPool()` RPC should return object pool information that can be consumed by the client. When this Gitaly RPC is executed behind Praefect, the replica path is returned instead of the relative path. The replica path is an implementation detail of Praefect and not useful to the client which really wants the relative path. This change implements a `GetObjectPoolHandler()` to explicitly handle the `GetObjectPool()` RPC in Praefect instead of relying on the `UnknownServiceHandler` to proxy the RPC. This enables the RPC's response replica path to be reversed by Praefect to the appropriate relative path and storage the client expects.
-rw-r--r--internal/gitaly/service/objectpool/get_test.go4
-rw-r--r--internal/praefect/get_object_pool.go109
-rw-r--r--internal/praefect/get_object_pool_test.go332
-rw-r--r--internal/praefect/server.go1
4 files changed, 444 insertions, 2 deletions
diff --git a/internal/gitaly/service/objectpool/get_test.go b/internal/gitaly/service/objectpool/get_test.go
index 96772e523..ae4d788f6 100644
--- a/internal/gitaly/service/objectpool/get_test.go
+++ b/internal/gitaly/service/objectpool/get_test.go
@@ -21,8 +21,8 @@ func TestGetObjectPoolSuccess(t *testing.T) {
cfg, repoProto, _, _, client := setup(t, ctx)
repo := localrepo.NewTestRepo(t, cfg, repoProto)
- _, pool, _ := createObjectPool(t, ctx, cfg, repoProto)
- relativePoolPath := pool.GetRelativePath()
+ poolProto, pool, _ := createObjectPool(t, ctx, cfg, repoProto)
+ relativePoolPath := poolProto.GetRepository().GetRelativePath()
require.NoError(t, pool.Link(ctx, repo))
resp, err := client.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
diff --git a/internal/praefect/get_object_pool.go b/internal/praefect/get_object_pool.go
new file mode 100644
index 000000000..64df9857f
--- /dev/null
+++ b/internal/praefect/get_object_pool.go
@@ -0,0 +1,109 @@
+package praefect
+
+import (
+ "errors"
+ "fmt"
+ "path/filepath"
+ "strconv"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+// GetObjectPoolHandler intercepts GetObjectPool RPC calls and rewrites replica path and storage
+// responses to reflect Praefect state of the object pool.
+func GetObjectPoolHandler(repoStore datastore.RepositoryStore, router Router) grpc.StreamHandler {
+ return func(_ interface{}, stream grpc.ServerStream) error {
+ var req gitalypb.GetObjectPoolRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ repo := req.GetRepository()
+ if repo == nil || repo.GetStorageName() == "" || repo.GetRelativePath() == "" {
+ return structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet)
+ }
+
+ ctx := stream.Context()
+ relativePath := repo.GetRelativePath()
+ virtualStorage := repo.GetStorageName()
+
+ // Object pool information can be retrieved from any up-to-date replica. Generate a route to
+ // a valid repository replica.
+ route, err := router.RouteRepositoryAccessor(
+ ctx, virtualStorage, relativePath, false,
+ )
+ switch {
+ case errors.Is(err, nodes.ErrVirtualStorageNotExist):
+ return structerr.NewInvalidArgument("route RPC: %w", err)
+ case errors.Is(err, datastore.ErrRepositoryNotFound):
+ return structerr.NewNotFound("route RPC: %w", err).WithMetadataItems(
+ structerr.MetadataItem{Key: "storage_name", Value: virtualStorage},
+ structerr.MetadataItem{Key: "relative_path", Value: relativePath},
+ )
+ case err != nil:
+ return fmt.Errorf("route RPC: %w", err)
+ }
+
+ // To connect to the correct repository on the Gitaly node, the repository's relative path
+ // and storage need to be rewritten in the RPC request.
+ client := gitalypb.NewObjectPoolServiceClient(route.Node.Connection)
+ resp, err := client.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
+ Repository: &gitalypb.Repository{
+ RelativePath: route.ReplicaPath,
+ StorageName: route.Node.Storage,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("get object pool: %w", err)
+ }
+
+ // If the repository does not link to an object pool, there is nothing in the response to
+ // rewrite and an empty response is returned.
+ if resp.GetObjectPool() == nil {
+ return stream.SendMsg(&gitalypb.GetObjectPoolResponse{})
+ }
+
+ poolRepo := resp.GetObjectPool().GetRepository()
+ if !storage.IsPraefectPoolRepository(poolRepo) {
+ // If the repository path is not a Praefect replica path, there is no need to convert to the
+ // relative path.
+ return stream.SendMsg(&gitalypb.GetObjectPoolResponse{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ RelativePath: poolRepo.GetRelativePath(),
+ StorageName: virtualStorage,
+ },
+ },
+ })
+ }
+
+ // The Praefect repository ID is the last component of the replica Path.
+ repoID, err := strconv.ParseInt(filepath.Base(poolRepo.GetRelativePath()), 10, 64)
+ if err != nil {
+ return fmt.Errorf("parsing repository ID: %w", err)
+ }
+
+ // Praefect stores the relationship between relative path and replica path for a given
+ // repository. Get the relative path information for the repository.
+ metadata, err := repoStore.GetRepositoryMetadata(ctx, repoID)
+ if err != nil {
+ return fmt.Errorf("get Praefect repository metadata: %w", err)
+ }
+
+ // The client expects information about an object pool linked to the repository. Rewrite the
+ // relative path and storage to match the object pool state stored in Praefect.
+ return stream.SendMsg(&gitalypb.GetObjectPoolResponse{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ RelativePath: metadata.RelativePath,
+ StorageName: virtualStorage,
+ },
+ },
+ })
+ }
+}
diff --git a/internal/praefect/get_object_pool_test.go b/internal/praefect/get_object_pool_test.go
new file mode 100644
index 000000000..86135ee01
--- /dev/null
+++ b/internal/praefect/get_object_pool_test.go
@@ -0,0 +1,332 @@
+package praefect
+
+import (
+ "context"
+ "math/rand"
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestGetObjectPoolHandler(t *testing.T) {
+ t.Parallel()
+ ctx := testhelper.Context(t)
+
+ const virtualStorage = "virtual-storage"
+
+ const gitaly1Storage = "gitaly-1"
+ const gitaly2Storage = "gitaly-2"
+
+ gitaly1Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly1Storage))
+ gitaly2Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly2Storage))
+ gitaly1Addr := testserver.RunGitalyServer(t, gitaly1Cfg, setup.RegisterAll, testserver.WithDisablePraefect())
+ gitaly2Addr := testserver.RunGitalyServer(t, gitaly2Cfg, setup.RegisterAll, testserver.WithDisablePraefect())
+ gitaly1Cfg.SocketPath = gitaly1Addr
+ gitaly2Cfg.SocketPath = gitaly2Addr
+
+ errServedByGitaly := structerr.NewInternal("request passed to Gitaly")
+
+ setupPraefect := func(t *testing.T) (gitalypb.ObjectPoolServiceClient, datastore.RepositoryStore) {
+ cfg := config.Config{
+ Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository},
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: gitaly1Storage, Address: gitaly1Addr},
+ {Storage: gitaly2Storage, Address: gitaly2Addr},
+ },
+ },
+ },
+ }
+
+ nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, nil, nil, nil, nil)
+ require.NoError(t, err)
+ t.Cleanup(nodeSet.Close)
+
+ db := testdb.New(t)
+ repoStore := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+
+ tmp := testhelper.TempDir(t)
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
+ require.NoError(t, err)
+
+ srv := NewGRPCServer(
+ config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}},
+ testhelper.NewDiscardingLogEntry(t),
+ protoregistry.GitalyProtoPreregistered,
+ func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ return nil, errServedByGitaly
+ },
+ nil,
+ repoStore,
+ nil,
+ NewPerRepositoryRouter(
+ nodeSet.Connections(),
+ nodes.NewPerRepositoryElector(db),
+ StaticHealthChecker(cfg.StorageNames()),
+ NewLockedRandom(rand.New(rand.NewSource(0))),
+ repoStore,
+ datastore.NewAssignmentStore(db, cfg.StorageNames()),
+ repoStore,
+ nil,
+ ),
+ nodeSet.Connections(),
+ nil,
+ nil,
+ nil,
+ )
+ t.Cleanup(srv.Stop)
+
+ go testhelper.MustServe(t, srv, ln)
+
+ clientConn, err := grpc.DialContext(ctx, "unix:"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ testhelper.MustClose(t, clientConn)
+ })
+
+ repoClient := gitalypb.NewRepositoryServiceClient(clientConn)
+ _, err = repoClient.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: &gitalypb.Repository{}})
+ testhelper.RequireGrpcError(t, errServedByGitaly, err)
+
+ return gitalypb.NewObjectPoolServiceClient(clientConn), repoStore
+ }
+
+ type setupData struct {
+ client gitalypb.ObjectPoolServiceClient
+ repository *gitalypb.Repository
+ expectedPool *gitalypb.ObjectPool
+ expectedError error
+ }
+
+ for _, tc := range []struct {
+ desc string
+ setup func(t *testing.T) setupData
+ }{
+ {
+ desc: "missing repository",
+ setup: func(t *testing.T) setupData {
+ // If a repository is not set in the request, validation fails.
+ client, _ := setupPraefect(t)
+
+ return setupData{
+ client: client,
+ repository: nil,
+ expectedError: structerr.NewInvalidArgument("%w", storage.ErrRepositoryNotSet),
+ }
+ },
+ },
+ {
+ desc: "virtual storage does not exist",
+ setup: func(t *testing.T) setupData {
+ // If the storage of the requested repository does not exist, the router fails to
+ // route the RPC.
+ client, _ := setupPraefect(t)
+
+ return setupData{
+ client: client,
+ repository: &gitalypb.Repository{StorageName: "doesn't exist", RelativePath: "relative-path"},
+ expectedError: structerr.NewInvalidArgument("route RPC: %w", nodes.ErrVirtualStorageNotExist),
+ }
+ },
+ },
+ {
+ desc: "repository not found",
+ setup: func(t *testing.T) setupData {
+ // If the repository specified in the request does not exist in Praefect, the router
+ // fails to route the RPC.
+ client, _ := setupPraefect(t)
+
+ return setupData{
+ client: client,
+ repository: &gitalypb.Repository{StorageName: virtualStorage, RelativePath: "doesn't exist"},
+ expectedError: structerr.NewNotFound("route RPC: consistent storages: repository not found"),
+ }
+ },
+ },
+ {
+ desc: "repository is not linked to object pool",
+ setup: func(t *testing.T) setupData {
+ client, repoStore := setupPraefect(t)
+
+ // Create repositories not linked to object pools on each Gitaly node with replica
+ // path and register them in Praefect.
+ relativePath := gittest.NewRepositoryName(t)
+ replicaPath := storage.DeriveReplicaPath(0)
+ gittest.CreateRepository(t, ctx, gitaly1Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ gittest.CreateRepository(t, ctx, gitaly2Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ require.NoError(t, repoStore.CreateRepository(ctx, 0, virtualStorage, relativePath, replicaPath, gitaly1Storage, []string{gitaly2Storage}, nil, false, false))
+
+ return setupData{
+ client: client,
+ repository: &gitalypb.Repository{StorageName: virtualStorage, RelativePath: relativePath},
+ expectedPool: nil,
+ }
+ },
+ },
+ {
+ desc: "repository is linked to object pool",
+ setup: func(t *testing.T) setupData {
+ client, repoStore := setupPraefect(t)
+
+ // Create repositories that will be liked to object pools on each Gitaly node with
+ // replica path and register them in Praefect.
+ relativePath := gittest.NewRepositoryName(t)
+ replicaPath := storage.DeriveReplicaPath(1)
+ repo1, _ := gittest.CreateRepository(t, ctx, gitaly1Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ repo2, _ := gittest.CreateRepository(t, ctx, gitaly2Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ require.NoError(t, repoStore.CreateRepository(ctx, 1, virtualStorage, relativePath, replicaPath, gitaly1Storage, []string{gitaly2Storage}, nil, false, false))
+
+ // Create object pool repositories that link to the previously created repositories
+ // with the appropriate cluster pool path and register them with Praefect.
+ poolRelativePath := gittest.NewObjectPoolName(t)
+ poolReplicaPath := storage.DerivePoolPath(2)
+ gittest.CreateObjectPool(t, ctx, gitaly1Cfg, repo1, gittest.CreateObjectPoolConfig{
+ RelativePath: poolReplicaPath,
+ LinkRepositoryToObjectPool: true,
+ })
+ gittest.CreateObjectPool(t, ctx, gitaly2Cfg, repo2, gittest.CreateObjectPoolConfig{
+ RelativePath: poolReplicaPath,
+ LinkRepositoryToObjectPool: true,
+ })
+ require.NoError(t, repoStore.CreateRepository(ctx, 2, virtualStorage, poolRelativePath, poolReplicaPath, gitaly1Storage, []string{gitaly2Storage}, nil, false, false))
+
+ return setupData{
+ client: client,
+ repository: &gitalypb.Repository{StorageName: virtualStorage, RelativePath: relativePath},
+ expectedPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: virtualStorage,
+ RelativePath: poolRelativePath,
+ },
+ },
+ }
+ },
+ },
+ {
+ desc: "object pool does not use cluster path",
+ setup: func(t *testing.T) setupData {
+ client, repoStore := setupPraefect(t)
+
+ // Create repositories that will be liked to object pools on each Gitaly node with
+ // replica path and register them in Praefect.
+ relativePath := gittest.NewRepositoryName(t)
+ replicaPath := storage.DeriveReplicaPath(3)
+ repo1, _ := gittest.CreateRepository(t, ctx, gitaly1Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ repo2, _ := gittest.CreateRepository(t, ctx, gitaly2Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ require.NoError(t, repoStore.CreateRepository(ctx, 3, virtualStorage, relativePath, replicaPath, gitaly1Storage, []string{gitaly2Storage}, nil, false, false))
+
+ // Create object pool repositories that link to the previously created repositories
+ // with the normal pool path and register them with Praefect. When a pooled cluster
+ // path is not used by an object pool repository, relative paths are not rewritten.
+ poolRelativePath := gittest.NewObjectPoolName(t)
+ gittest.CreateObjectPool(t, ctx, gitaly1Cfg, repo1, gittest.CreateObjectPoolConfig{
+ RelativePath: poolRelativePath,
+ LinkRepositoryToObjectPool: true,
+ })
+ gittest.CreateObjectPool(t, ctx, gitaly2Cfg, repo2, gittest.CreateObjectPoolConfig{
+ RelativePath: poolRelativePath,
+ LinkRepositoryToObjectPool: true,
+ })
+ // Use different relative path for object pool repository in Praefect to demonstrate
+ // that the response relative path is not rewritten.
+ require.NoError(t, repoStore.CreateRepository(ctx, 4, virtualStorage, gittest.NewObjectPoolName(t), poolRelativePath, gitaly1Storage, []string{gitaly2Storage}, nil, false, false))
+
+ return setupData{
+ client: client,
+ repository: &gitalypb.Repository{StorageName: virtualStorage, RelativePath: relativePath},
+ expectedPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: virtualStorage,
+ RelativePath: poolRelativePath,
+ },
+ },
+ }
+ },
+ },
+ {
+ desc: "object pool cluster path does not contain valid repository ID",
+ setup: func(t *testing.T) setupData {
+ client, repoStore := setupPraefect(t)
+
+ // Create repositories that will be liked to object pools on each Gitaly node with
+ // replica path and register them in Praefect.
+ relativePath := gittest.NewRepositoryName(t)
+ replicaPath := storage.DeriveReplicaPath(5)
+ repo1, _ := gittest.CreateRepository(t, ctx, gitaly1Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ repo2, _ := gittest.CreateRepository(t, ctx, gitaly2Cfg, gittest.CreateRepositoryConfig{
+ RelativePath: replicaPath,
+ })
+ require.NoError(t, repoStore.CreateRepository(ctx, 5, virtualStorage, relativePath, replicaPath, gitaly1Storage, []string{gitaly2Storage}, nil, false, false))
+
+ // Create object pool repositories that link to the previously created repositories
+ // with the invalid cluster pool path and register them with Praefect. Praefect
+ // relies on the cluster path to get the repository ID which is needed to fetch
+ // repository metadata. If a valid repository ID cannot be parsed from the object
+ // pool cluster path, an error is returned.
+ poolRelativePath := gittest.NewObjectPoolName(t)
+ poolReplicaPath := storage.DerivePoolPath(6) + "foobar"
+ gittest.CreateObjectPool(t, ctx, gitaly1Cfg, repo1, gittest.CreateObjectPoolConfig{
+ RelativePath: poolReplicaPath,
+ LinkRepositoryToObjectPool: true,
+ })
+ gittest.CreateObjectPool(t, ctx, gitaly2Cfg, repo2, gittest.CreateObjectPoolConfig{
+ RelativePath: poolReplicaPath,
+ LinkRepositoryToObjectPool: true,
+ })
+ require.NoError(t, repoStore.CreateRepository(ctx, 6, virtualStorage, poolRelativePath, poolRelativePath, gitaly1Storage, []string{gitaly2Storage}, nil, false, false))
+
+ return setupData{
+ client: client,
+ repository: &gitalypb.Repository{StorageName: virtualStorage, RelativePath: relativePath},
+ expectedError: structerr.NewInternal("parsing repository ID: strconv.ParseInt: parsing \"6foobar\": invalid syntax"),
+ }
+ },
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ setup := tc.setup(t)
+
+ resp, err := setup.client.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{Repository: setup.repository})
+ testhelper.RequireGrpcError(t, setup.expectedError, err)
+ require.Equal(t, setup.expectedPool, resp.GetObjectPool())
+ })
+ }
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index c8dd602ee..c71681e6f 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -212,6 +212,7 @@ func NewGRPCServer(
})
proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
"DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),
+ "GetObjectPool": GetObjectPoolHandler(rs, router),
})
}