Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2020-07-10 12:40:49 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2020-07-10 12:40:49 +0300
commit74fa51387fbc4ac199309f5c0461d4cdde2a1a2f (patch)
tree94e2d381bd3085090a7c9964fa1264a89d7751d7
parent8c5463516eec15b68a9065f7f51a9cac2f5e6f3b (diff)
parenta4d4a5b81a32e54ec1ff85a831ad37f96d92728f (diff)
Merge branch 'pks-transactions-healthy-nodes' into 'master'
Only let healthy secondaries take part in transactions See merge request gitlab-org/gitaly!2365
-rw-r--r--changelogs/unreleased/pks-transactions-healthy-nodes.yml5
-rw-r--r--internal/praefect/coordinator.go9
-rw-r--r--internal/praefect/nodes/manager.go13
-rw-r--r--internal/praefect/server_test.go16
4 files changed, 34 insertions, 9 deletions
diff --git a/changelogs/unreleased/pks-transactions-healthy-nodes.yml b/changelogs/unreleased/pks-transactions-healthy-nodes.yml
new file mode 100644
index 000000000..e0a4b862c
--- /dev/null
+++ b/changelogs/unreleased/pks-transactions-healthy-nodes.yml
@@ -0,0 +1,5 @@
+---
+title: Only let healthy secondaries take part in transactions
+merge_request: 2365
+author:
+type: fixed
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 47a48d4af..38c16c282 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -195,10 +195,13 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
var secondaryDests []proxy.Destination
if _, ok := transactionRPCs[call.fullMethodName]; ok && featureflag.IsEnabled(ctx, featureflag.ReferenceTransactions) {
+ // Make sure to only let healthy nodes take part in transactions, otherwise we'll be
+ // completely blocked until they come back.
+ healthySecondaries := shard.GetHealthySecondaries()
+
var voters []transactions.Voter
var threshold uint
-
- for _, node := range append(shard.Secondaries, shard.Primary) {
+ for _, node := range append(healthySecondaries, shard.Primary) {
voters = append(voters, transactions.Voter{
Name: node.GetStorage(),
Votes: 1,
@@ -219,7 +222,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
primaryDest.Ctx = helper.IncomingToOutgoing(injectedCtx)
- for _, secondary := range shard.Secondaries {
+ for _, secondary := range healthySecondaries {
secondaryMsg, err := rewrittenRepositoryMessage(call.methodInfo, call.msg, secondary.GetStorage())
if err != nil {
return nil, err
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 78c4fe5d3..dcbc306d2 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -51,6 +51,19 @@ func (s Shard) GetNode(storage string) (Node, error) {
return nil, fmt.Errorf("node with storage %q does not exist", storage)
}
+// GetHealthySecondaries returns all secondaries of the shard whose which are
+// currently known to be healthy.
+func (s Shard) GetHealthySecondaries() []Node {
+ healthySecondaries := make([]Node, 0, len(s.Secondaries))
+ for _, secondary := range s.Secondaries {
+ if !secondary.IsHealthy() {
+ continue
+ }
+ healthySecondaries = append(healthySecondaries, secondary)
+ }
+ return healthySecondaries
+}
+
// Manager is responsible for returning shards for virtual storages
type Manager interface {
GetShard(virtualStorageName string) (Shard, error)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index d8016cc89..8f98d44c5 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -727,17 +727,12 @@ func (m *mockSmartHTTP) Called(method string) int {
}
func newGrpcServer(t *testing.T, srv gitalypb.SmartHTTPServiceServer) (string, *grpc.Server) {
- grpcSrv := testhelper.NewTestGrpcServer(t, nil, nil)
socketPath := testhelper.GetTemporaryGitalySocketFileName()
+ grpcSrv, _ := testhelper.NewServerWithHealth(t, socketPath)
gitalypb.RegisterSmartHTTPServiceServer(grpcSrv, srv)
reflection.Register(grpcSrv)
- listener, err := net.Listen("unix", socketPath)
- require.NoError(t, err)
-
- go func() { grpcSrv.Serve(listener) }()
-
return socketPath, grpcSrv
}
@@ -800,6 +795,15 @@ func TestProxyWrites(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ for _, storage := range conf.VirtualStorages[0].Nodes {
+ node, err := shard.GetNode(storage.Storage)
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(ctx, t, node, true)
+ }
+
ctx = featureflag.OutgoingCtxWithFeatureFlags(ctx, featureflag.ReferenceTransactions)
testRepo, _, cleanup := testhelper.NewTestRepo(t)