Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-09-10 18:45:21 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-09-11 11:32:04 +0300
commitff353c01376699ab6b1176a87dcabb24f8d02371 (patch)
treec311783a82b1c4cacf3403f2435ccce1dbd6b8f9 /internal/praefect
parentd2e978f8e8f47a49c3bcfbd470b2f790e52c5ee2 (diff)
remove server scoped handling from coordinator
This commit removes server scoped request handling from the coordinator as there are no server scoped RPCs Praefect should direct to Gitaly nodes.
Diffstat (limited to 'internal/praefect')
-rw-r--r--internal/praefect/coordinator.go17
-rw-r--r--internal/praefect/coordinator_test.go98
-rw-r--r--internal/praefect/helper_test.go9
-rw-r--r--internal/praefect/server_test.go47
4 files changed, 0 insertions, 171 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 4769c73e9..6a193f514 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -447,23 +447,6 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string,
return c.directStorageScopedMessage(ctx, mi, m)
}
- // TODO: please refer to https://gitlab.com/gitlab-org/gitaly/-/issues/2974
- if mi.Scope == protoregistry.ScopeServer {
- shard, err := c.nodeMgr.GetShard(c.conf.VirtualStorages[0].Name)
- if err != nil {
- if errors.Is(err, nodes.ErrVirtualStorageNotExist) {
- return nil, helper.ErrInvalidArgument(err)
- }
- return nil, err
- }
-
- return proxy.NewStreamParameters(proxy.Destination{
- Ctx: helper.IncomingToOutgoing(ctx),
- Conn: shard.Primary.GetConnection(),
- Msg: payload,
- }, nil, func() error { return nil }, nil), nil
- }
-
return nil, helper.ErrInternalf("rpc with undefined scope %q", mi.Scope)
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index c01e0ac3e..a229a69a8 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -898,104 +898,6 @@ func TestCoordinatorEnqueueFailure(t *testing.T) {
require.Equal(t, err.Error(), "rpc error: code = Unknown desc = enqueue replication event: "+expectErrMsg)
}
-func TestStreamDirectorServerScope(t *testing.T) {
- gz := proto.FileDescriptor("mock.proto")
- fd, err := protoregistry.ExtractFileDescriptor(gz)
- require.NoError(t, err)
-
- registry, err := protoregistry.New(fd)
- require.NoError(t, err)
-
- conf := config.Config{VirtualStorages: []*config.VirtualStorage{{Name: "praefect"}, {Name: "another"}}}
- mgr := &nodes.MockManager{GetShardFunc: func(s string) (nodes.Shard, error) {
- require.Equal(t, "praefect", s)
- return nodes.Shard{Primary: &nodes.MockNode{}}, nil
- }}
- coordinator := NewCoordinator(
- nil,
- datastore.NewMemoryRepositoryStore(conf.StorageNames()),
- mgr,
- nil,
- conf,
- registry,
- )
-
- fullMethod := "/mock.SimpleService/ServerAccessor"
- requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeServer, protoregistry.OpAccessor)
-
- frame, err := proto.Marshal(&mock.SimpleRequest{})
- require.NoError(t, err)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- sp, err := coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
- require.NoError(t, err)
- require.NotNil(t, sp.Primary())
- require.Empty(t, sp.Secondaries())
-}
-
-func TestStreamDirectorServerScope_Error(t *testing.T) {
- gz := proto.FileDescriptor("mock.proto")
- fd, err := protoregistry.ExtractFileDescriptor(gz)
- require.NoError(t, err)
-
- registry, err := protoregistry.New(fd)
- require.NoError(t, err)
-
- conf := config.Config{VirtualStorages: []*config.VirtualStorage{{Name: "fake"}, {Name: "another"}}}
- rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
-
- t.Run("unknown storage provided", func(t *testing.T) {
- mgr := &nodes.MockManager{
- GetShardFunc: func(s string) (nodes.Shard, error) {
- require.Equal(t, "fake", s)
- return nodes.Shard{}, nodes.ErrVirtualStorageNotExist
- },
- }
- coordinator := NewCoordinator(nil, rs, mgr, nil, conf, registry)
-
- fullMethod := "/mock.SimpleService/ServerAccessor"
- requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeServer, protoregistry.OpAccessor)
-
- frame, err := proto.Marshal(&mock.SimpleRequest{})
- require.NoError(t, err)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- _, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
- require.Error(t, err)
- result, ok := status.FromError(err)
- require.True(t, ok)
- require.Equal(t, codes.InvalidArgument, result.Code())
- require.Equal(t, "virtual storage does not exist", result.Message())
- })
-
- t.Run("primary not healthy", func(t *testing.T) {
- mgr := &nodes.MockManager{
- GetShardFunc: func(s string) (nodes.Shard, error) {
- require.Equal(t, "fake", s)
- return nodes.Shard{}, nodes.ErrPrimaryNotHealthy
- },
- }
- coordinator := NewCoordinator(nil, rs, mgr, nil, conf, registry)
-
- fullMethod := "/mock.SimpleService/ServerAccessor"
- requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeServer, protoregistry.OpAccessor)
-
- frame, err := proto.Marshal(&mock.SimpleRequest{})
- require.NoError(t, err)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- _, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
- require.Error(t, err)
- require.Equal(t, nodes.ErrPrimaryNotHealthy, err)
- })
-}
-
func TestStreamDirectorStorageScope(t *testing.T) {
// stubs health-check requests because nodes.NewManager establishes connection on creation
gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 8a5e301e2..3ca5c0221 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -34,15 +34,6 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
-func waitUntil(t *testing.T, ch <-chan struct{}, timeout time.Duration) {
- select {
- case <-ch:
- break
- case <-time.After(timeout):
- t.Errorf("timed out waiting for channel after %s", timeout)
- }
-}
-
// generates a praefect configuration with the specified number of backend
// nodes
func testConfig(backends int) config.Config {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 0dae99082..89f24e408 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -46,53 +46,6 @@ import (
"google.golang.org/grpc/status"
)
-func TestServerRouteServerAccessor(t *testing.T) {
- var (
- conf = testConfig(1)
- reqQ = make(chan *mock.SimpleRequest)
-
- expectResp = &mock.SimpleResponse{Value: 2}
-
- // note: a server scoped RPC will be randomly routed
- // to an available backend server. To simplify our
- // test, a single backend server is used.
- backends = map[string]mock.SimpleServiceServer{
- conf.VirtualStorages[0].Nodes[0].Storage: &mockSvc{
- serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) {
- reqQ <- req
- return expectResp, nil
- },
- },
- }
- )
-
- cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends)
- defer cleanup()
-
- cli := mock.NewSimpleServiceClient(cc)
-
- expectReq := &mock.SimpleRequest{Value: 1}
-
- done := make(chan struct{})
- go func() {
- defer close(done)
-
- actualReq := <-reqQ
- assert.True(t, proto.Equal(expectReq, actualReq),
- "received unexpected request value: %+v instead of %+v", actualReq, expectReq)
- }()
-
- ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(time.Second))
- defer cancel()
-
- actualResp, err := cli.ServerAccessor(ctx, expectReq)
- require.NoError(t, err)
- require.True(t, proto.Equal(expectResp, actualResp),
- "expected response was not routed back")
-
- waitUntil(t, done, time.Second)
-}
-
func TestGitalyServerInfo(t *testing.T) {
gitVersion, err := git.Version()
require.NoError(t, err)