Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorÆvar Arnfjörð Bjarmason <avar@gitlab.com>2020-09-16 17:03:07 +0300
committerÆvar Arnfjörð Bjarmason <avar@gitlab.com>2020-09-16 17:03:07 +0300
commit1161a59ae1f034a29f1f6a5885e912e986aad3a0 (patch)
tree9c85484aa03e30b42091445b9a51a87da5fe4b08
parentc9434931eb970b012860d7812274cd16eae50b16 (diff)
parentff353c01376699ab6b1176a87dcabb24f8d02371 (diff)
Merge branch 'smh-remove-server-scope' into 'master'
Remove server scoped handling from coordinator Closes #2974 See merge request gitlab-org/gitaly!2546
-rw-r--r--changelogs/unreleased/smh-remove-server-scope.yml5
-rw-r--r--internal/praefect/coordinator.go17
-rw-r--r--internal/praefect/coordinator_test.go98
-rw-r--r--internal/praefect/helper_test.go9
-rw-r--r--internal/praefect/server_test.go47
5 files changed, 5 insertions, 171 deletions
diff --git a/changelogs/unreleased/smh-remove-server-scope.yml b/changelogs/unreleased/smh-remove-server-scope.yml
new file mode 100644
index 000000000..420814352
--- /dev/null
+++ b/changelogs/unreleased/smh-remove-server-scope.yml
@@ -0,0 +1,5 @@
+---
+title: Remove server scoped handling from coordinator
+merge_request: 2546
+author:
+type: removed
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 4769c73e9..6a193f514 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -447,23 +447,6 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string,
return c.directStorageScopedMessage(ctx, mi, m)
}
- // TODO: please refer to https://gitlab.com/gitlab-org/gitaly/-/issues/2974
- if mi.Scope == protoregistry.ScopeServer {
- shard, err := c.nodeMgr.GetShard(c.conf.VirtualStorages[0].Name)
- if err != nil {
- if errors.Is(err, nodes.ErrVirtualStorageNotExist) {
- return nil, helper.ErrInvalidArgument(err)
- }
- return nil, err
- }
-
- return proxy.NewStreamParameters(proxy.Destination{
- Ctx: helper.IncomingToOutgoing(ctx),
- Conn: shard.Primary.GetConnection(),
- Msg: payload,
- }, nil, func() error { return nil }, nil), nil
- }
-
return nil, helper.ErrInternalf("rpc with undefined scope %q", mi.Scope)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index c01e0ac3e..a229a69a8 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -898,104 +898,6 @@ func TestCoordinatorEnqueueFailure(t *testing.T) {
require.Equal(t, err.Error(), "rpc error: code = Unknown desc = enqueue replication event: "+expectErrMsg)
}
-func TestStreamDirectorServerScope(t *testing.T) {
- gz := proto.FileDescriptor("mock.proto")
- fd, err := protoregistry.ExtractFileDescriptor(gz)
- require.NoError(t, err)
-
- registry, err := protoregistry.New(fd)
- require.NoError(t, err)
-
- conf := config.Config{VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}, {Name: "another"}}}
- mgr := &nodes.MockManager{GetShardFunc: func(s string) (nodes.Shard, error) {
- require.Equal(t, "praefect", s)
- return nodes.Shard{Primary: &nodes.MockNode{}}, nil
- }}
- coordinator := NewCoordinator(
- nil,
- datastore.NewMemoryRepositoryStore(conf.StorageNames()),
- mgr,
- nil,
- conf,
- registry,
- )
-
- fullMethod := "/mock.SimpleService/ServerAccessor"
- requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeServer, protoregistry.OpAccessor)
-
- frame, err := proto.Marshal(&mock.SimpleRequest{})
- require.NoError(t, err)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- sp, err := coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
- require.NoError(t, err)
- require.NotNil(t, sp.Primary())
- require.Empty(t, sp.Secondaries())
-}
-
-func TestStreamDirectorServerScope_Error(t *testing.T) {
- gz := proto.FileDescriptor("mock.proto")
- fd, err := protoregistry.ExtractFileDescriptor(gz)
- require.NoError(t, err)
-
- registry, err := protoregistry.New(fd)
- require.NoError(t, err)
-
- conf := config.Config{VirtualStorages: []*config.VirtualStorage{{Name: "fake"}, {Name: "another"}}}
- rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
-
- t.Run("unknown storage provided", func(t *testing.T) {
- mgr := &nodes.MockManager{
- GetShardFunc: func(s string) (nodes.Shard, error) {
- require.Equal(t, "fake", s)
- return nodes.Shard{}, nodes.ErrVirtualStorageNotExist
- },
- }
- coordinator := NewCoordinator(nil, rs, mgr, nil, conf, registry)
-
- fullMethod := "/mock.SimpleService/ServerAccessor"
- requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeServer, protoregistry.OpAccessor)
-
- frame, err := proto.Marshal(&mock.SimpleRequest{})
- require.NoError(t, err)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- _, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
- require.Error(t, err)
- result, ok := status.FromError(err)
- require.True(t, ok)
- require.Equal(t, codes.InvalidArgument, result.Code())
- require.Equal(t, "virtual storage does not exist", result.Message())
- })
-
- t.Run("primary not healthy", func(t *testing.T) {
- mgr := &nodes.MockManager{
- GetShardFunc: func(s string) (nodes.Shard, error) {
- require.Equal(t, "fake", s)
- return nodes.Shard{}, nodes.ErrPrimaryNotHealthy
- },
- }
- coordinator := NewCoordinator(nil, rs, mgr, nil, conf, registry)
-
- fullMethod := "/mock.SimpleService/ServerAccessor"
- requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeServer, protoregistry.OpAccessor)
-
- frame, err := proto.Marshal(&mock.SimpleRequest{})
- require.NoError(t, err)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- _, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
- require.Error(t, err)
- require.Equal(t, nodes.ErrPrimaryNotHealthy, err)
- })
-}
-
func TestStreamDirectorStorageScope(t *testing.T) {
// stubs health-check requests because nodes.NewManager establishes connection on creation
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 8a5e301e2..3ca5c0221 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -34,15 +34,6 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
-func waitUntil(t *testing.T, ch <-chan struct{}, timeout time.Duration) {
- select {
- case <-ch:
- break
- case <-time.After(timeout):
- t.Errorf("timed out waiting for channel after %s", timeout)
- }
-}
-
// generates a praefect configuration with the specified number of backend
// nodes
func testConfig(backends int) config.Config {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 6e5f7704b..6aee6fd99 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -46,53 +46,6 @@ import (
"google.golang.org/grpc/status"
)
-func TestServerRouteServerAccessor(t *testing.T) {
- var (
- conf = testConfig(1)
- reqQ = make(chan *mock.SimpleRequest)
-
- expectResp = &mock.SimpleResponse{Value: 2}
-
- // note: a server scoped RPC will be randomly routed
- // to an available backend server. To simplify our
- // test, a single backend server is used.
- backends = map[string]mock.SimpleServiceServer{
- conf.VirtualStorages[0].Nodes[0].Storage: &mockSvc{
- serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) {
- reqQ <- req
- return expectResp, nil
- },
- },
- }
- )
-
- cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends)
- defer cleanup()
-
- cli := mock.NewSimpleServiceClient(cc)
-
- expectReq := &mock.SimpleRequest{Value: 1}
-
- done := make(chan struct{})
- go func() {
- defer close(done)
-
- actualReq := <-reqQ
- assert.True(t, proto.Equal(expectReq, actualReq),
- "received unexpected request value: %+v instead of %+v", actualReq, expectReq)
- }()
-
- ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second))
- defer cancel()
-
- actualResp, err := cli.ServerAccessor(ctx, expectReq)
- require.NoError(t, err)
- require.True(t, proto.Equal(expectResp, actualResp),
- "expected response was not routed back")
-
- waitUntil(t, done, time.Second)
-}
-
func TestGitalyServerInfo(t *testing.T) {
gitVersion, err := git.Version()
require.NoError(t, err)