Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <proglottis@gmail.com>2021-09-15 02:04:33 +0300
committerJames Fargher <proglottis@gmail.com>2021-09-15 02:04:33 +0300
commit3903ef7556e5705d0b62bbcbdd24649e4c4835a3 (patch)
tree7b8c2dfdcbfad4971a776375f7d6862dfb12a5e6
parent6ee4f0a09c4fc4a5df334a5033f19986272373a1 (diff)
parentb04aa750d115162ca8f468244aeb2810b6be8c10 (diff)
Merge branch 'smh-not-found-mutator' into 'master'
Return NotFound code for mutators targeting non-existent repositories See merge request gitlab-org/gitaly!3869
-rw-r--r--internal/praefect/coordinator.go8
-rw-r--r--internal/praefect/coordinator_test.go153
2 files changed, 101 insertions, 60 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 66a828b59..b2819380e 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -357,6 +357,10 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr
}
if err != nil {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return nil, helper.ErrNotFound(err)
+ }
+
return nil, err
}
@@ -390,10 +394,6 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal
ctx, virtualStorage, repoPath, shouldRouteRepositoryAccessorToPrimary(ctx, call),
)
if err != nil {
- if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
- return nil, helper.ErrNotFound(err)
- }
-
return nil, fmt.Errorf("accessor call: route repository accessor: %w", err)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 91fd9f42d..13fd4263d 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
+ "math/rand"
"strings"
"sync"
"sync/atomic"
@@ -170,75 +171,115 @@ func TestStreamDirectorMutator(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- entry := testhelper.DiscardTestEntry(t)
+ txMgr := transactions.NewManager(conf)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil)
require.NoError(t, err)
- nodeMgr.Start(0, time.Hour)
+ defer nodeSet.Close()
- txMgr := transactions.NewManager(conf)
- rs := datastore.MockRepositoryStore{}
+ for _, tc := range []struct {
+ desc string
+ repositoryExists bool
+ error error
+ }{
+ {
+ desc: "succcessful",
+ repositoryExists: true,
+ },
+ {
+ desc: "repository not found",
+ error: helper.ErrNotFound(fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get primary: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ tx := glsql.NewDB(t).Begin(t)
+ defer tx.Rollback(t)
- coordinator := NewCoordinator(
- queueInterceptor,
- rs,
- NewNodeManagerRouter(nodeMgr, rs),
- txMgr,
- conf,
- protoregistry.GitalyProtoPreregistered,
- )
+ rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
- frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{
- Origin: &targetRepo,
- ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo},
- })
- require.NoError(t, err)
+ if tc.repositoryExists {
+ require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true))
+ }
- require.NoError(t, err)
+ testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
- fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool"
+ coordinator := NewCoordinator(
+ queueInterceptor,
+ rs,
+ NewPerRepositoryRouter(
+ nodeSet.Connections(),
+ nodes.NewPerRepositoryElector(tx),
+ StaticHealthChecker(conf.StorageNames()),
+ NewLockedRandom(rand.New(rand.NewSource(0))),
+ rs,
+ datastore.NewAssignmentStore(tx, conf.StorageNames()),
+ rs,
+ nil,
+ ),
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
- peeker := &mockPeeker{frame}
- streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
- require.NoError(t, err)
- require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
+ frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{
+ Origin: &targetRepo,
+ ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo},
+ })
+ require.NoError(t, err)
- mi, err := coordinator.registry.LookupMethod(fullMethod)
- require.NoError(t, err)
+ require.NoError(t, err)
- m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
- require.NoError(t, err)
+ fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool"
- rewrittenTargetRepo, err := mi.TargetRepo(m)
- require.NoError(t, err)
- require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+ peeker := &mockPeeker{frame}
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ if tc.error != nil {
+ require.Equal(t, tc.error, err)
+ return
+ }
- replEventWait.Add(1) // expected only one event to be created
- // this call creates new events in the queue and simulates usual flow of the update operation
- require.NoError(t, streamParams.RequestFinalizer())
+ require.NoError(t, err)
+ require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
- replEventWait.Wait() // wait until event persisted (async operation)
- events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
- require.NoError(t, err)
- require.Len(t, events, 1)
-
- expectedEvent := datastore.ReplicationEvent{
- ID: 1,
- State: datastore.JobStateInProgress,
- Attempt: 2,
- LockID: "praefect|praefect-internal-2|/path/to/hashed/storage",
- CreatedAt: events[0].CreatedAt,
- UpdatedAt: events[0].UpdatedAt,
- Job: datastore.ReplicationJob{
- Change: datastore.UpdateRepo,
- VirtualStorage: conf.VirtualStorages[0].Name,
- RelativePath: targetRepo.RelativePath,
- TargetNodeStorage: secondaryNode.Storage,
- SourceNodeStorage: primaryNode.Storage,
- },
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
+
+ m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
+ require.NoError(t, err)
+
+ rewrittenTargetRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+
+ replEventWait.Add(1) // expected only one event to be created
+ // this call creates new events in the queue and simulates usual flow of the update operation
+ require.NoError(t, streamParams.RequestFinalizer())
+
+ replEventWait.Wait() // wait until event persisted (async operation)
+ events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
+ require.NoError(t, err)
+ require.Len(t, events, 1)
+
+ expectedEvent := datastore.ReplicationEvent{
+ ID: 1,
+ State: datastore.JobStateInProgress,
+ Attempt: 2,
+ LockID: "praefect|praefect-internal-2|/path/to/hashed/storage",
+ CreatedAt: events[0].CreatedAt,
+ UpdatedAt: events[0].UpdatedAt,
+ Job: datastore.ReplicationJob{
+ RepositoryID: 1,
+ Change: datastore.UpdateRepo,
+ VirtualStorage: conf.VirtualStorages[0].Name,
+ RelativePath: targetRepo.RelativePath,
+ TargetNodeStorage: secondaryNode.Storage,
+ SourceNodeStorage: primaryNode.Storage,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ }
+ require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
+ })
}
- require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
}
func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
@@ -416,7 +457,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
return RouterNode{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
},
},
- error: helper.ErrNotFound(commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)),
+ error: helper.ErrNotFound(fmt.Errorf("accessor call: route repository accessor: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath))),
},
} {
t.Run(tc.desc, func(t *testing.T) {