Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-05-02 18:33:48 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-05-02 18:33:48 +0300
commit1ff9e1dabd34e2bea91f1f6cf1253f5952b9c90a (patch)
tree720a0f105e52005950b0789f2bc4504d720a76ac
parent118c4a042dfee4310733f71368da6e948a22c92e (diff)
parent7c8bd8efa52b22ed411db0542f239dedcda7bf6d (diff)
Merge branch 'pks-coordinator-fix-resolving-additional-repository-via-wrong-storage' into 'master'
coordinator: Fix rewriting additional repos to the wrong storage node See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5688 Merged-by: Justin Tobler <jtobler@gitlab.com> Approved-by: karthik nayak <knayak@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r--internal/praefect/coordinator.go35
-rw-r--r--internal/praefect/coordinator_test.go338
2 files changed, 294 insertions, 79 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 10773f5e0..7214718ed 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -382,6 +382,21 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
} else if err != nil {
return nil, structerr.NewInvalidArgument("%w", err)
} else {
+ // We do not support resolving multiple different repositories that reside on
+ // different virtual storages. This kind of makes sense from a technical point of
+ // view as Praefect cannot guarantee to resolve both virtual storages. So for the
+ // time being we accept this restriction and handle it explicitly.
+ //
+ // Note that this is the same condition as in `rewrittenRepositoryMessage()`. This
+ // is done so that we detect such erroneous requests before we try to connect to the
+ // target node, which allows us to return a proper error to the user that indicates
+ // the underlying issue instead of an unrelated symptom.
+ //
+ // This limitation may be lifted in the future.
+ if virtualStorage != additionalRepo.GetStorageName() {
+ return nil, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported")
+ }
+
additionalRepoRelativePath = additionalRepo.GetRelativePath()
}
@@ -796,20 +811,32 @@ func rewrittenRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, st
return nil, structerr.NewInvalidArgument("%w", err)
}
- // rewrite the target repository
- targetRepo.StorageName = storage
- targetRepo.RelativePath = relativePath
-
if additionalRepo, err := mi.AdditionalRepo(m); errors.Is(err, protoregistry.ErrTargetRepoMissing) {
// Nothing to rewrite in case the additional repository either doesn't exist in the
// message or wasn't set by the caller.
} else if err != nil {
return nil, structerr.NewInvalidArgument("%w", err)
} else {
+ // We do not support resolving multiple different repositories that reside on
+ // different virtual storages. This kind of makes sense from a technical point of
+ // view as Praefect cannot guarantee to resolve both virtual storages. So for the
+ // time being we accept this restriction and handle it explicitly.
+ //
+ // This limitation may be lifted in the future.
+ if targetRepo.GetStorageName() != additionalRepo.GetStorageName() {
+ return nil, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported")
+ }
+
additionalRepo.StorageName = storage
additionalRepo.RelativePath = additionalRelativePath
}
+ // Rewrite the target repository. Note that we only do this after having written the
+ // additional repository so that we can check whether the original storage name of both
+ // repositories match.
+ targetRepo.StorageName = storage
+ targetRepo.RelativePath = relativePath
+
return proxy.NewCodec().Marshal(m)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 82f698f51..71a96ba53 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -141,47 +142,232 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
func TestStreamDirectorMutator(t *testing.T) {
t.Parallel()
- gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
- testhelper.NewServerWithHealth(t, gitalySocket0)
- testhelper.NewServerWithHealth(t, gitalySocket1)
- primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
- primaryNode := &config.Node{Address: primaryAddress, Storage: "praefect-internal-1"}
- secondaryNode := &config.Node{Address: secondaryAddress, Storage: "praefect-internal-2"}
+ ctx := testhelper.Context(t)
+ ctx = correlation.ContextWithCorrelation(ctx, "my-correlation-id")
+
+ primarySocket := testhelper.GetTemporaryGitalySocketFileName(t)
+ testhelper.NewServerWithHealth(t, primarySocket)
+ primaryNode := &config.Node{Address: "unix://" + primarySocket, Storage: "praefect-internal-1"}
+
+ secondarySocket := testhelper.GetTemporaryGitalySocketFileName(t)
+ testhelper.NewServerWithHealth(t, secondarySocket)
+ secondaryNode := &config.Node{Address: "unix://" + secondarySocket, Storage: "praefect-internal-2"}
+
+ unrelatedSocket := testhelper.GetTemporaryGitalySocketFileName(t)
+ testhelper.NewServerWithHealth(t, unrelatedSocket)
+ unrelatedNode := &config.Node{Address: "unix://" + unrelatedSocket, Storage: "praefect-unrelated"}
+
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
Name: "praefect",
Nodes: []*config.Node{primaryNode, secondaryNode},
},
+ {
+ Name: "unrelated",
+ Nodes: []*config.Node{unrelatedNode},
+ },
},
}
- db := testdb.New(t)
-
- targetRepo := gitalypb.Repository{
- StorageName: "praefect",
- RelativePath: "/path/to/hashed/storage",
- }
- ctx := testhelper.Context(t)
+ db := testdb.New(t)
txMgr := transactions.NewManager(conf)
nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
defer nodeSet.Close()
+ type setupData struct {
+ method string
+ request proto.Message
+ expectedErr error
+ expectedRewrittenRequest proto.Message
+ expectedEvent datastore.ReplicationEvent
+ }
+
+ var currentRepoID int64
+ createRepo := func(t *testing.T, rs datastore.RepositoryStore, storage, relativePath, rewrittenPath string) int64 {
+ t.Helper()
+ repoID := atomic.AddInt64(&currentRepoID, 1)
+ require.NoError(t, rs.CreateRepository(ctx, repoID, storage, relativePath, rewrittenPath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true))
+ return repoID
+ }
+
for _, tc := range []struct {
- desc string
- repositoryExists bool
- error error
+ desc string
+ setup func(t *testing.T, rs datastore.RepositoryStore) setupData
}{
{
- desc: "succcessful",
- repositoryExists: true,
+ desc: "target repository",
+ setup: func(t *testing.T, rs datastore.RepositoryStore) setupData {
+ relativePath := gittest.NewRepositoryName(t)
+ targetRepo := &gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: relativePath,
+ }
+
+ repoID := createRepo(t, rs, "praefect", relativePath, relativePath)
+
+ return setupData{
+ method: "/gitaly.OperationService/UserCreateTag",
+ request: &gitalypb.UserCreateTagRequest{
+ Repository: targetRepo,
+ },
+ expectedRewrittenRequest: &gitalypb.UserCreateTagRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "praefect-internal-1",
+ RelativePath: relativePath,
+ },
+ },
+ expectedEvent: datastore.ReplicationEvent{
+ State: datastore.JobStateInProgress,
+ Attempt: 2,
+ LockID: fmt.Sprintf("praefect|praefect-internal-2|%s", relativePath),
+ Job: datastore.ReplicationJob{
+ RepositoryID: repoID,
+ Change: datastore.UpdateRepo,
+ VirtualStorage: conf.VirtualStorages[0].Name,
+ RelativePath: targetRepo.RelativePath,
+ TargetNodeStorage: secondaryNode.Storage,
+ SourceNodeStorage: primaryNode.Storage,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ },
+ }
+ },
},
{
- desc: "repository not found",
- error: structerr.NewNotFound("%w", fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get repository id: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))),
+ desc: "target and additional repository",
+ setup: func(t *testing.T, rs datastore.RepositoryStore) setupData {
+ targetRepo := &gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+ additionalRepo := &gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+
+ targetRepoID := createRepo(t, rs, "praefect", targetRepo.RelativePath, "rewritten-target")
+ createRepo(t, rs, "praefect", additionalRepo.RelativePath, "rewritten-additional")
+
+ return setupData{
+ method: "/gitaly.ObjectPoolService/FetchIntoObjectPool",
+ request: &gitalypb.FetchIntoObjectPoolRequest{
+ Origin: additionalRepo,
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: targetRepo,
+ },
+ },
+ expectedRewrittenRequest: &gitalypb.FetchIntoObjectPoolRequest{
+ Origin: &gitalypb.Repository{
+ StorageName: "praefect-internal-1",
+ RelativePath: "rewritten-additional",
+ },
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: "praefect-internal-1",
+ RelativePath: "rewritten-target",
+ },
+ },
+ },
+ expectedEvent: datastore.ReplicationEvent{
+ State: datastore.JobStateInProgress,
+ Attempt: 2,
+ LockID: fmt.Sprintf("praefect|praefect-internal-2|%s", targetRepo.RelativePath),
+ Job: datastore.ReplicationJob{
+ RepositoryID: targetRepoID,
+ Change: datastore.UpdateRepo,
+ VirtualStorage: conf.VirtualStorages[0].Name,
+ RelativePath: targetRepo.RelativePath,
+ TargetNodeStorage: secondaryNode.Storage,
+ SourceNodeStorage: primaryNode.Storage,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ },
+ }
+ },
+ },
+ {
+ desc: "target repository not found",
+ setup: func(t *testing.T, rs datastore.RepositoryStore) setupData {
+ // We do not create the repository, so we expect this request to
+ // fail.
+ targetRepo := &gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+
+ return setupData{
+ method: "/gitaly.OperationService/UserCreateTag",
+ request: &gitalypb.UserCreateTagRequest{
+ Repository: targetRepo,
+ },
+ expectedErr: structerr.NewNotFound("%w", fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get repository id: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))),
+ }
+ },
+ },
+ {
+ desc: "additional repository not found",
+ setup: func(t *testing.T, rs datastore.RepositoryStore) setupData {
+ targetRepo := &gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+ additionalRepo := &gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+
+ // We create the target repository, but don't create the additional
+ // one.
+ createRepo(t, rs, "praefect", targetRepo.RelativePath, "rewritten-target")
+
+ return setupData{
+ method: "/gitaly.ObjectPoolService/FetchIntoObjectPool",
+ request: &gitalypb.FetchIntoObjectPoolRequest{
+ Origin: additionalRepo,
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: targetRepo,
+ },
+ },
+ expectedErr: structerr.NewNotFound("%w", fmt.Errorf("mutator call: route repository mutator: %w",
+ fmt.Errorf("resolve additional replica path: %w",
+ fmt.Errorf("get additional repository id: %w",
+ commonerr.NewRepositoryNotFoundError(additionalRepo.StorageName, additionalRepo.RelativePath),
+ ),
+ ),
+ )),
+ }
+ },
+ },
+ {
+ desc: "target and additional repository on different storages",
+ setup: func(t *testing.T, rs datastore.RepositoryStore) setupData {
+ targetRepo := &gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+ additionalRepo := &gitalypb.Repository{
+ StorageName: "unrelated",
+ RelativePath: gittest.NewRepositoryName(t),
+ }
+
+ createRepo(t, rs, "praefect", targetRepo.RelativePath, "rewritten-target")
+ createRepo(t, rs, "unrelated", additionalRepo.RelativePath, "rewritten-additional")
+
+ return setupData{
+ method: "/gitaly.ObjectPoolService/FetchIntoObjectPool",
+ request: &gitalypb.FetchIntoObjectPoolRequest{
+ Origin: additionalRepo,
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: targetRepo,
+ },
+ },
+ expectedErr: structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported"),
+ }
+ },
},
} {
t.Run(tc.desc, func(t *testing.T) {
@@ -190,12 +376,8 @@ func TestStreamDirectorMutator(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
- if tc.repositoryExists {
- require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true))
- }
-
testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
return queue.Enqueue(ctx, event)
@@ -219,66 +401,46 @@ func TestStreamDirectorMutator(t *testing.T) {
protoregistry.GitalyProtoPreregistered,
)
- frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
- Origin: &targetRepo,
- ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo},
- })
- require.NoError(t, err)
+ setup := tc.setup(t, rs)
+ marshalledRequest, err := proto.Marshal(setup.request)
require.NoError(t, err)
- fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool"
+ streamParams, err := coordinator.StreamDirector(ctx, setup.method, &mockPeeker{marshalledRequest})
+ require.Equal(t, setup.expectedErr, err)
- peeker := &mockPeeker{frame}
- streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
- if tc.error != nil {
- require.Equal(t, tc.error, err)
+ if setup.expectedErr != nil {
return
}
- require.NoError(t, err)
- require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
-
- mi, err := coordinator.registry.LookupMethod(fullMethod)
- require.NoError(t, err)
+ // Assert that the stream parameters look as expecected. First, we verify
+ // that the request gets routed to the correct primary node.
+ require.Equal(t, primaryNode.Address, streamParams.Primary().Conn.Target())
- m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
+ // Second, we verify that the request was rewritten so that its storage
+ // matches the primary node's storage.
+ mi, err := coordinator.registry.LookupMethod(setup.method)
require.NoError(t, err)
-
- rewrittenTargetRepo, err := mi.TargetRepo(m)
+ rewrittenRequest, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
require.NoError(t, err)
- require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+ testhelper.ProtoEqual(t, setup.expectedRewrittenRequest, rewrittenRequest)
- // this call creates new events in the queue and simulates usual flow of the update operation
+ // Finalize the request and then wait for the resulting replication event to
+ // be queued.
require.NoError(t, streamParams.RequestFinalizer())
-
- // wait until event persisted (async operation)
require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
return len(i.GetEnqueuedResult()) == 1
}))
+ // Assert that the replication event queue contains the expected event now.
events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, events, 1)
-
- expectedEvent := datastore.ReplicationEvent{
- ID: 1,
- State: datastore.JobStateInProgress,
- Attempt: 2,
- LockID: "praefect|praefect-internal-2|/path/to/hashed/storage",
- CreatedAt: events[0].CreatedAt,
- UpdatedAt: events[0].UpdatedAt,
- Job: datastore.ReplicationJob{
- RepositoryID: 1,
- Change: datastore.UpdateRepo,
- VirtualStorage: conf.VirtualStorages[0].Name,
- RelativePath: targetRepo.RelativePath,
- TargetNodeStorage: secondaryNode.Storage,
- SourceNodeStorage: primaryNode.Storage,
- },
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
- }
- require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
+ // Unset a bunch of data that is indeterministic.
+ events[0].ID = 0
+ events[0].CreatedAt = time.Time{}
+ events[0].UpdatedAt = nil
+ require.Equal(t, setup.expectedEvent, events[0])
})
}
}
@@ -1249,6 +1411,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
}
func TestRewrittenRepositoryMessage(t *testing.T) {
+ t.Parallel()
+
buildRequest := func(storageName, relativePath, additionalRelativePath string) *gitalypb.CreateObjectPoolRequest {
return &gitalypb.CreateObjectPoolRequest{
ObjectPool: &gitalypb.ObjectPool{
@@ -1264,19 +1428,43 @@ func TestRewrittenRepositoryMessage(t *testing.T) {
}
}
- originalRequest := buildRequest("original-storage", "original-relative-path", "original-additional-relative-path")
+ t.Run("successfully rewritten request", func(t *testing.T) {
+ originalRequest := buildRequest("original-storage", "original-relative-path", "original-additional-relative-path")
- methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.ObjectPoolService/CreateObjectPool")
- require.NoError(t, err)
+ methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.ObjectPoolService/CreateObjectPool")
+ require.NoError(t, err)
- rewrittenMessageBytes, err := rewrittenRepositoryMessage(methodInfo, originalRequest, "rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path")
- require.NoError(t, err)
+ rewrittenMessageBytes, err := rewrittenRepositoryMessage(methodInfo, originalRequest, "rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path")
+ require.NoError(t, err)
+
+ var rewrittenMessage gitalypb.CreateObjectPoolRequest
+ require.NoError(t, proto.Unmarshal(rewrittenMessageBytes, &rewrittenMessage))
+
+ testhelper.ProtoEqual(t, buildRequest("original-storage", "original-relative-path", "original-additional-relative-path"), originalRequest)
+ testhelper.ProtoEqual(t, buildRequest("rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path"), &rewrittenMessage)
+ })
+
+ t.Run("rewriting differing storages fails", func(t *testing.T) {
+ request := &gitalypb.CreateObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: &gitalypb.Repository{
+ StorageName: "a",
+ RelativePath: "a",
+ },
+ },
+ Origin: &gitalypb.Repository{
+ StorageName: "b",
+ RelativePath: "b",
+ },
+ }
- var rewrittenMessage gitalypb.CreateObjectPoolRequest
- require.NoError(t, proto.Unmarshal(rewrittenMessageBytes, &rewrittenMessage))
+ methodInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.ObjectPoolService/CreateObjectPool")
+ require.NoError(t, err)
- testhelper.ProtoEqual(t, buildRequest("original-storage", "original-relative-path", "original-additional-relative-path"), originalRequest)
- testhelper.ProtoEqual(t, buildRequest("rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path"), &rewrittenMessage)
+ rewrittenRequest, err := rewrittenRepositoryMessage(methodInfo, request, "rewritten-storage", "rewritten-relative-path", "rewritten-additional-relative-path")
+ require.Equal(t, structerr.NewInvalidArgument("resolving additional repository on different storage than target repository is not supported"), err)
+ require.Nil(t, rewrittenRequest)
+ })
}
func TestStreamDirector_repo_creation(t *testing.T) {