Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-18 15:16:09 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-23 12:01:52 +0300
commit6ca0cd6e5dd9c4635b92198ec8a663af1692b5a6 (patch)
treedf6148fa7a8dbd336bee4a6513a25021427f8519
parent51073bb211020a41e68c4813c4e576120e669767 (diff)
praefect: Implement tests for failing RPC error proxying
This commit implements a bunch of tests to assert that we correctly proxy GRPC errors from primary while ignoring any errors from secondaries. One of the tests is currently broken and will be fixed in a subsequent commit. The tests use the MockManager, which was missing two functions to get nodes. These have also been mocked such that tests succeed.
-rw-r--r--internal/praefect/coordinator.go2
-rw-r--r--internal/praefect/coordinator_test.go171
-rw-r--r--internal/praefect/nodes/mock.go31
3 files changed, 203 insertions, 1 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b5f8aaefd..3356fbf54 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -404,7 +404,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
transaction, transactionCleanup, err := c.registerTransaction(ctx, route.Primary, route.Secondaries)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("%w: %v %v", err, route.Primary, route.Secondaries)
}
injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index f43f9854b..659521207 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -16,10 +16,12 @@ import (
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
praefect_metadata "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
@@ -1290,3 +1292,172 @@ func requireScopeOperation(t *testing.T, registry *protoregistry.Registry, fullM
require.Equal(t, scope, mi.Scope, "scope doesn't match requested")
require.Equal(t, op, mi.Operation, "operation type doesn't match requested")
}
+
+type mockOperationServer struct {
+ gitalypb.UnimplementedOperationServiceServer
+ t testing.TB
+ wg *sync.WaitGroup
+ err error
+ called bool
+}
+
+func (s *mockOperationServer) UserCreateBranch(
+ context.Context,
+ *gitalypb.UserCreateBranchRequest,
+) (*gitalypb.UserCreateBranchResponse, error) {
+ // We need to wait for all servers to arrive in this RPC. If we don't it could be that for
+ // example the primary arrives quicker than the others and directly errors. This would cause
+ // stream cancellation, and if the secondaries didn't yet end up in UserCreateBranch, we
+ // wouldn't see the function call.
+ s.wg.Done()
+ s.wg.Wait()
+
+ s.called = true
+ return &gitalypb.UserCreateBranchResponse{}, s.err
+}
+
+// TestCoordinator_grpcErrorHandling asserts that we correctly proxy errors in case any of the nodes
+// fails. Most importantly, we want to make sure to only ever forward errors from the primary and
+// never from the secondaries.
+func TestCoordinator_grpcErrorHandling(t *testing.T) {
+ ctx, cleanup := testhelper.Context()
+ defer cleanup()
+
+ praefectConfig := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: testhelper.DefaultStorageName,
+ },
+ },
+ }
+
+ var wg sync.WaitGroup
+ type gitalyNode struct {
+ mock *nodes.MockNode
+ grpcServer *grpc.Server
+ operationServer *mockOperationServer
+ }
+
+ gitalies := make(map[string]gitalyNode)
+ for _, gitaly := range []string{"primary", "secondary-1", "secondary-2"} {
+ gitaly := gitaly
+
+ grpcServer := testhelper.NewTestGrpcServer(t, nil, nil)
+
+ operationServer := &mockOperationServer{
+ t: t,
+ wg: &wg,
+ }
+ gitalypb.RegisterOperationServiceServer(grpcServer, operationServer)
+
+ listener, address := testhelper.GetLocalhostListener(t)
+ go grpcServer.Serve(listener)
+ defer grpcServer.Stop()
+
+ conn, err := client.DialContext(ctx, "tcp://"+address, []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
+ })
+ require.NoError(t, err)
+
+ gitalies[gitaly] = gitalyNode{
+ mock: &nodes.MockNode{
+ Conn: conn,
+ Healthy: true,
+ GetStorageMethod: func() string { return gitaly },
+ },
+ grpcServer: grpcServer,
+ operationServer: operationServer,
+ }
+
+ praefectConfig.VirtualStorages[0].Nodes = append(praefectConfig.VirtualStorages[0].Nodes, &config.Node{
+ Address: "tcp://" + address,
+ Storage: gitaly,
+ })
+ }
+
+ // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are
+ // healthy. We need fixed roles and unhealthy nodes will not take part in transactions.
+ nodeManager := &nodes.MockManager{
+ Storage: testhelper.DefaultStorageName,
+ GetShardFunc: func(shardName string) (nodes.Shard, error) {
+ require.Equal(t, testhelper.DefaultStorageName, shardName)
+ return nodes.Shard{
+ Primary: gitalies["primary"].mock,
+ Secondaries: []nodes.Node{
+ gitalies["secondary-1"].mock,
+ gitalies["secondary-2"].mock,
+ },
+ }, nil
+ },
+ }
+
+ // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent
+ // nodes will take part in transactions.
+ repositoryStore := datastore.MockRepositoryStore{
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
+ return map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil
+ },
+ }
+
+ praefectConn, _, cleanup := runPraefectServer(t, praefectConfig, buildOptions{
+ withNodeMgr: nodeManager,
+ withRepoStore: repositoryStore,
+ })
+ defer cleanup()
+
+ repoProto, _, cleanup := testhelper.NewTestRepo(t)
+ defer cleanup()
+
+ operationClient := gitalypb.NewOperationServiceClient(praefectConn)
+
+ for _, tc := range []struct {
+ desc string
+ errByNode map[string]error
+ expectedErr error
+ }{
+ {
+ desc: "no errors",
+ },
+ {
+ desc: "primary error gets forwarded",
+ errByNode: map[string]error{
+ "primary": errors.New("foo"),
+ },
+ expectedErr: status.Error(codes.Unknown, "foo"),
+ },
+ {
+ desc: "secondary error gets ignored (test is broken)",
+ errByNode: map[string]error{
+ "secondary-1": errors.New("foo"),
+ },
+ expectedErr: status.Error(codes.Internal, "failed proxying to secondary: rpc error: code = Unknown desc = foo"),
+ },
+ {
+ desc: "primary error has precedence",
+ errByNode: map[string]error{
+ "primary": errors.New("primary"),
+ "secondary-1": errors.New("secondary-1"),
+ "secondary-2": errors.New("secondary-2"),
+ },
+ expectedErr: status.Error(codes.Unknown, "primary"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ for name, node := range gitalies {
+ wg.Add(1)
+ node.operationServer.err = tc.errByNode[name]
+ node.operationServer.called = false
+ }
+
+ _, err := operationClient.UserCreateBranch(ctx,
+ &gitalypb.UserCreateBranchRequest{
+ Repository: repoProto,
+ })
+ require.Equal(t, tc.expectedErr, err)
+
+ for _, node := range gitalies {
+ require.True(t, node.operationServer.called, "expected gitaly %q to have been called", node.mock.GetStorage())
+ }
+ })
+ }
+}
diff --git a/internal/praefect/nodes/mock.go b/internal/praefect/nodes/mock.go
index 6d87bd276..4d4def3da 100644
--- a/internal/praefect/nodes/mock.go
+++ b/internal/praefect/nodes/mock.go
@@ -11,12 +11,43 @@ import (
type MockManager struct {
Manager
GetShardFunc func(string) (Shard, error)
+ Storage string
}
func (m *MockManager) GetShard(_ context.Context, storage string) (Shard, error) {
return m.GetShardFunc(storage)
}
+// Nodes returns nodes contained by the GetShardFunc. Note that this mocking only works in case the
+// MockManager was set up with a Storage as the GetShardFunc will be called with that storage as
+// parameter.
+func (m *MockManager) Nodes() map[string][]Node {
+ nodes := make(map[string][]Node)
+ shard, _ := m.GetShardFunc(m.Storage)
+ nodes[m.Storage] = append(nodes[m.Storage], shard.Primary)
+ nodes[m.Storage] = append(nodes[m.Storage], shard.Secondaries...)
+ return nodes
+}
+
+// HealthyNodes returns healthy nodes. This is implemented similar to Nodes() and thus also requires
+// setup of the MockManager's Storage field.
+func (m *MockManager) HealthyNodes() map[string][]string {
+ shard, _ := m.GetShardFunc(m.Storage)
+
+ nodes := make(map[string][]string)
+ if shard.Primary.IsHealthy() {
+ nodes[m.Storage] = append(nodes[m.Storage], shard.Primary.GetStorage())
+ }
+
+ for _, secondary := range shard.Secondaries {
+ if secondary.IsHealthy() {
+ nodes[m.Storage] = append(nodes[m.Storage], secondary.GetStorage())
+ }
+ }
+
+ return nodes
+}
+
// MockNode is a helper for tests that implements Node and allows
// for parametrizing behavior.
type MockNode struct {