Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-10-13 11:28:07 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-10-16 10:18:37 +0300
commit6fc78bf949f7f8c563c29023ae9516b727e7e6dd (patch)
tree92f1afac42a0e13a79d9f33db336ab5000c8ef9f /internal/praefect/coordinator_test.go
parent902e3bf932344c39bf089e5c6145e493bdcee2cf (diff)
coordinator: Stop raising errors on stopped transactions
When a transaction was gracefully stopped, it indicates that any node taking part has hit an expected error case, e.g. the primary has executed hooks which declined a push. The coordinator can't and shouldn't do anything about it as this is an expected error inside of the business logic which the Gitaly nodes should know to handle correctly. Change the logic to reflect this by silently returning in case the primary's transaction has been stopped.
Diffstat (limited to 'internal/praefect/coordinator_test.go')
-rw-r--r--internal/praefect/coordinator_test.go109
1 files changed, 109 insertions, 0 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 01c5603f0..4b2b6b6a8 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -479,6 +479,115 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
}
+func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ server, _ := testhelper.NewServerWithHealth(t, socket)
+ defer server.Stop()
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ &config.VirtualStorage{
+ Name: "praefect",
+ Nodes: []*config.Node{
+ &config.Node{Address: "unix://" + socket, Storage: "primary"},
+ &config.Node{Address: "unix://" + socket, Storage: "secondary"},
+ },
+ },
+ },
+ }
+
+ repo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
+ require.NoError(t, err)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ for _, name := range []string{"primary", "secondary"} {
+ node, err := shard.GetNode(name)
+ require.NoError(t, err)
+ waitNodeToChangeHealthStatus(ctx, t, node, true)
+ }
+
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+ for _, node := range []string{"primary", "secondary"} {
+ require.NoError(t, rs.SetGeneration(ctx, "praefect", repo.RelativePath, node, 1))
+ }
+
+ txMgr := transactions.NewManager(conf)
+
+ coordinator := NewCoordinator(
+ datastore.NewMemoryReplicationEventQueue(conf),
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ txMgr,
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
+
+ fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
+
+ frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{
+ Repository: &repo,
+ })
+ require.NoError(t, err)
+ peeker := &mockPeeker{frame}
+
+ streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
+ require.NoError(t, err)
+
+ transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx)
+ require.NoError(t, err)
+
+ var wg sync.WaitGroup
+ var syncWG sync.WaitGroup
+
+ wg.Add(2)
+ syncWG.Add(2)
+
+ go func() {
+ defer wg.Done()
+
+ vote := sha1.Sum([]byte("vote"))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, "primary", vote[:])
+ require.NoError(t, err)
+
+ // Assure that at least one vote was agreed on.
+ syncWG.Done()
+ syncWG.Wait()
+
+ require.NoError(t, txMgr.StopTransaction(ctx, transaction.ID))
+ }()
+
+ go func() {
+ defer wg.Done()
+
+ vote := sha1.Sum([]byte("vote"))
+ err := txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:])
+ require.NoError(t, err)
+
+ // Assure that at least one vote was agreed on.
+ syncWG.Done()
+ syncWG.Wait()
+
+ err = txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:])
+ assert.True(t, errors.Is(err, transactions.ErrTransactionStopped))
+ }()
+
+ wg.Wait()
+
+ err = streamParams.RequestFinalizer()
+ require.NoError(t, err)
+}
+
func TestStreamDirectorAccessor(t *testing.T) {
gitalySocket := testhelper.GetTemporaryGitalySocketFileName()
srv, _ := testhelper.NewServerWithHealth(t, gitalySocket)