diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-18 15:16:09 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-23 12:01:52 +0300 |
commit | 6ca0cd6e5dd9c4635b92198ec8a663af1692b5a6 (patch) | |
tree | df6148fa7a8dbd336bee4a6513a25021427f8519 | |
parent | 51073bb211020a41e68c4813c4e576120e669767 (diff) |
praefect: Implement tests for failing RPC error proxying
This commit implements a bunch of tests to assert that we correctly
proxy GRPC errors from primary while ignoring any errors from
secondaries. One of the tests is currently broken and will be fixed in a
subsequent commit.
The tests use the MockManager, which was missing two functions to get
nodes. These have also been mocked such that tests succeed.
-rw-r--r-- | internal/praefect/coordinator.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 171 | ||||
-rw-r--r-- | internal/praefect/nodes/mock.go | 31 |
3 files changed, 203 insertions, 1 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b5f8aaefd..3356fbf54 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -404,7 +404,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall transaction, transactionCleanup, err := c.registerTransaction(ctx, route.Primary, route.Secondaries) if err != nil { - return nil, err + return nil, fmt.Errorf("%w: %v %v", err, route.Primary, route.Secondaries) } injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index f43f9854b..659521207 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -16,10 +16,12 @@ import ( "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" praefect_metadata "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" @@ -1290,3 +1292,172 @@ func requireScopeOperation(t *testing.T, registry *protoregistry.Registry, fullM require.Equal(t, scope, mi.Scope, "scope doesn't match requested") require.Equal(t, op, mi.Operation, "operation type doesn't match requested") } + +type mockOperationServer struct { + gitalypb.UnimplementedOperationServiceServer + t testing.TB + wg *sync.WaitGroup + err error + called bool +} + +func (s *mockOperationServer) UserCreateBranch( + context.Context, + *gitalypb.UserCreateBranchRequest, +) (*gitalypb.UserCreateBranchResponse, error) { + // We need to wait for all servers to arrive in this RPC. If we don't it could be that for + // example the primary arrives quicker than the others and directly errors. This would cause + // stream cancellation, and if the secondaries didn't yet end up in UserCreateBranch, we + // wouldn't see the function call. + s.wg.Done() + s.wg.Wait() + + s.called = true + return &gitalypb.UserCreateBranchResponse{}, s.err +} + +// TestCoordinator_grpcErrorHandling asserts that we correctly proxy errors in case any of the nodes +// fails. Most importantly, we want to make sure to only ever forward errors from the primary and +// never from the secondaries. +func TestCoordinator_grpcErrorHandling(t *testing.T) { + ctx, cleanup := testhelper.Context() + defer cleanup() + + praefectConfig := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: testhelper.DefaultStorageName, + }, + }, + } + + var wg sync.WaitGroup + type gitalyNode struct { + mock *nodes.MockNode + grpcServer *grpc.Server + operationServer *mockOperationServer + } + + gitalies := make(map[string]gitalyNode) + for _, gitaly := range []string{"primary", "secondary-1", "secondary-2"} { + gitaly := gitaly + + grpcServer := testhelper.NewTestGrpcServer(t, nil, nil) + + operationServer := &mockOperationServer{ + t: t, + wg: &wg, + } + gitalypb.RegisterOperationServiceServer(grpcServer, operationServer) + + listener, address := testhelper.GetLocalhostListener(t) + go grpcServer.Serve(listener) + defer grpcServer.Stop() + + conn, err := client.DialContext(ctx, "tcp://"+address, []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), + }) + require.NoError(t, err) + + gitalies[gitaly] = gitalyNode{ + mock: &nodes.MockNode{ + Conn: conn, + Healthy: true, + GetStorageMethod: func() string { return gitaly }, + }, + grpcServer: grpcServer, + operationServer: operationServer, + } + + praefectConfig.VirtualStorages[0].Nodes = append(praefectConfig.VirtualStorages[0].Nodes, &config.Node{ + Address: "tcp://" + address, + Storage: gitaly, + }) + } + + // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are + // healthy. We need fixed roles and unhealthy nodes will not take part in transactions. + nodeManager := &nodes.MockManager{ + Storage: testhelper.DefaultStorageName, + GetShardFunc: func(shardName string) (nodes.Shard, error) { + require.Equal(t, testhelper.DefaultStorageName, shardName) + return nodes.Shard{ + Primary: gitalies["primary"].mock, + Secondaries: []nodes.Node{ + gitalies["secondary-1"].mock, + gitalies["secondary-2"].mock, + }, + }, nil + }, + } + + // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent + // nodes will take part in transactions. + repositoryStore := datastore.MockRepositoryStore{ + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { + return map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil + }, + } + + praefectConn, _, cleanup := runPraefectServer(t, praefectConfig, buildOptions{ + withNodeMgr: nodeManager, + withRepoStore: repositoryStore, + }) + defer cleanup() + + repoProto, _, cleanup := testhelper.NewTestRepo(t) + defer cleanup() + + operationClient := gitalypb.NewOperationServiceClient(praefectConn) + + for _, tc := range []struct { + desc string + errByNode map[string]error + expectedErr error + }{ + { + desc: "no errors", + }, + { + desc: "primary error gets forwarded", + errByNode: map[string]error{ + "primary": errors.New("foo"), + }, + expectedErr: status.Error(codes.Unknown, "foo"), + }, + { + desc: "secondary error gets ignored (test is broken)", + errByNode: map[string]error{ + "secondary-1": errors.New("foo"), + }, + expectedErr: status.Error(codes.Internal, "failed proxying to secondary: rpc error: code = Unknown desc = foo"), + }, + { + desc: "primary error has precedence", + errByNode: map[string]error{ + "primary": errors.New("primary"), + "secondary-1": errors.New("secondary-1"), + "secondary-2": errors.New("secondary-2"), + }, + expectedErr: status.Error(codes.Unknown, "primary"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + for name, node := range gitalies { + wg.Add(1) + node.operationServer.err = tc.errByNode[name] + node.operationServer.called = false + } + + _, err := operationClient.UserCreateBranch(ctx, + &gitalypb.UserCreateBranchRequest{ + Repository: repoProto, + }) + require.Equal(t, tc.expectedErr, err) + + for _, node := range gitalies { + require.True(t, node.operationServer.called, "expected gitaly %q to have been called", node.mock.GetStorage()) + } + }) + } +} diff --git a/internal/praefect/nodes/mock.go b/internal/praefect/nodes/mock.go index 6d87bd276..4d4def3da 100644 --- a/internal/praefect/nodes/mock.go +++ b/internal/praefect/nodes/mock.go @@ -11,12 +11,43 @@ import ( type MockManager struct { Manager GetShardFunc func(string) (Shard, error) + Storage string } func (m *MockManager) GetShard(_ context.Context, storage string) (Shard, error) { return m.GetShardFunc(storage) } +// Nodes returns nodes contained by the GetShardFunc. Note that this mocking only works in case the +// MockManager was set up with a Storage as the GetShardFunc will be called with that storage as +// parameter. +func (m *MockManager) Nodes() map[string][]Node { + nodes := make(map[string][]Node) + shard, _ := m.GetShardFunc(m.Storage) + nodes[m.Storage] = append(nodes[m.Storage], shard.Primary) + nodes[m.Storage] = append(nodes[m.Storage], shard.Secondaries...) + return nodes +} + +// HealthyNodes returns healthy nodes. This is implemented similar to Nodes() and thus also requires +// setup of the MockManager's Storage field. +func (m *MockManager) HealthyNodes() map[string][]string { + shard, _ := m.GetShardFunc(m.Storage) + + nodes := make(map[string][]string) + if shard.Primary.IsHealthy() { + nodes[m.Storage] = append(nodes[m.Storage], shard.Primary.GetStorage()) + } + + for _, secondary := range shard.Secondaries { + if secondary.IsHealthy() { + nodes[m.Storage] = append(nodes[m.Storage], secondary.GetStorage()) + } + } + + return nodes +} + // MockNode is a helper for tests that implements Node and allows // for parametrizing behavior. type MockNode struct { |