diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-07-10 12:40:49 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-07-10 12:40:49 +0300 |
commit | 74fa51387fbc4ac199309f5c0461d4cdde2a1a2f (patch) | |
tree | 94e2d381bd3085090a7c9964fa1264a89d7751d7 | |
parent | 8c5463516eec15b68a9065f7f51a9cac2f5e6f3b (diff) | |
parent | a4d4a5b81a32e54ec1ff85a831ad37f96d92728f (diff) |
Merge branch 'pks-transactions-healthy-nodes' into 'master'
Only let healthy secondaries take part in transactions
See merge request gitlab-org/gitaly!2365
-rw-r--r-- | changelogs/unreleased/pks-transactions-healthy-nodes.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 9 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 13 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 16 |
4 files changed, 34 insertions, 9 deletions
diff --git a/changelogs/unreleased/pks-transactions-healthy-nodes.yml b/changelogs/unreleased/pks-transactions-healthy-nodes.yml new file mode 100644 index 000000000..e0a4b862c --- /dev/null +++ b/changelogs/unreleased/pks-transactions-healthy-nodes.yml @@ -0,0 +1,5 @@ +--- +title: Only let healthy secondaries take part in transactions +merge_request: 2365 +author: +type: fixed diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 47a48d4af..38c16c282 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -195,10 +195,13 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall var secondaryDests []proxy.Destination if _, ok := transactionRPCs[call.fullMethodName]; ok && featureflag.IsEnabled(ctx, featureflag.ReferenceTransactions) { + // Make sure to only let healthy nodes take part in transactions, otherwise we'll be + // completely blocked until they come back. + healthySecondaries := shard.GetHealthySecondaries() + var voters []transactions.Voter var threshold uint - - for _, node := range append(shard.Secondaries, shard.Primary) { + for _, node := range append(healthySecondaries, shard.Primary) { voters = append(voters, transactions.Voter{ Name: node.GetStorage(), Votes: 1, @@ -219,7 +222,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall primaryDest.Ctx = helper.IncomingToOutgoing(injectedCtx) - for _, secondary := range shard.Secondaries { + for _, secondary := range healthySecondaries { secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.GetStorage()) if err != nil { return nil, err diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 78c4fe5d3..dcbc306d2 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -51,6 +51,19 @@ func (s Shard) GetNode(storage string) (Node, error) { return nil, fmt.Errorf("node with storage %q does not exist", storage) } +// GetHealthySecondaries returns all secondaries of the shard whose which are +// currently known to be healthy. +func (s Shard) GetHealthySecondaries() []Node { + healthySecondaries := make([]Node, 0, len(s.Secondaries)) + for _, secondary := range s.Secondaries { + if !secondary.IsHealthy() { + continue + } + healthySecondaries = append(healthySecondaries, secondary) + } + return healthySecondaries +} + // Manager is responsible for returning shards for virtual storages type Manager interface { GetShard(virtualStorageName string) (Shard, error) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index d8016cc89..8f98d44c5 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -727,17 +727,12 @@ func (m *mockSmartHTTP) Called(method string) int { } func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *grpc.Server) { - grpcSrv := testhelper.NewTestGrpcServer(t, nil, nil) socketPath := testhelper.GetTemporaryGitalySocketFileName() + grpcSrv, _ := testhelper.NewServerWithHealth(t, socketPath) gitalypb.RegisterSmartHTTPServiceServer(grpcSrv, srv) reflection.Register(grpcSrv) - listener, err := net.Listen("unix", socketPath) - require.NoError(t, err) - - go func() { grpcSrv.Serve(listener) }() - return socketPath, grpcSrv } @@ -800,6 +795,15 @@ func TestProxyWrites(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() + shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) + require.NoError(t, err) + + for _, storage := range conf.VirtualStorages[0].Nodes { + node, err := shard.GetNode(storage.Storage) + require.NoError(t, err) + waitNodeToChangeHealthStatus(ctx, t, node, true) + } + ctx = featureflag.OutgoingCtxWithFeatureFlags(ctx, featureflag.ReferenceTransactions) testRepo, _, cleanup := testhelper.NewTestRepo(t) |