diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-10-13 11:28:07 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-10-16 10:18:37 +0300 |
commit | 6fc78bf949f7f8c563c29023ae9516b727e7e6dd (patch) | |
tree | 92f1afac42a0e13a79d9f33db336ab5000c8ef9f /internal/praefect/coordinator_test.go | |
parent | 902e3bf932344c39bf089e5c6145e493bdcee2cf (diff) |
coordinator: Stop raising errors on stopped transactions
When a transaction was gracefully stopped, it indicates that any node
taking part has hit an expected error case, e.g. the primary has
executed hooks which declined a push. The coordinator can't and
shouldn't do anything about it as this is an expected error inside of
the business logic which the Gitaly nodes should know to handle
correctly.
Change the logic to reflect this by silently returning in case the
primary's transaction has been stopped.
Diffstat (limited to 'internal/praefect/coordinator_test.go')
-rw-r--r-- | internal/praefect/coordinator_test.go | 109 |
1 files changed, 109 insertions, 0 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 01c5603f0..4b2b6b6a8 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -479,6 +479,115 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } } +func TestStreamDirectorMutator_StopTransaction(t *testing.T) { + socket := testhelper.GetTemporaryGitalySocketFileName() + server, _ := testhelper.NewServerWithHealth(t, socket) + defer server.Stop() + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + &config.VirtualStorage{ + Name: "praefect", + Nodes: []*config.Node{ + &config.Node{Address: "unix://" + socket, Storage: "primary"}, + &config.Node{Address: "unix://" + socket, Storage: "secondary"}, + }, + }, + }, + } + + repo := gitalypb.Repository{ + StorageName: "praefect", + RelativePath: "/path/to/hashed/storage", + } + + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil) + require.NoError(t, err) + nodeMgr.Start(0, time.Hour) + + shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) + require.NoError(t, err) + + ctx, cancel := testhelper.Context() + defer cancel() + + for _, name := range []string{"primary", "secondary"} { + node, err := shard.GetNode(name) + require.NoError(t, err) + waitNodeToChangeHealthStatus(ctx, t, node, true) + } + + rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + for _, node := range []string{"primary", "secondary"} { + require.NoError(t, rs.SetGeneration(ctx, "praefect", repo.RelativePath, node, 1)) + } + + txMgr := transactions.NewManager(conf) + + coordinator := NewCoordinator( + datastore.NewMemoryReplicationEventQueue(conf), + rs, + NewNodeManagerRouter(nodeMgr, rs), + txMgr, + conf, + protoregistry.GitalyProtoPreregistered, + ) + + fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" + + frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{ + Repository: &repo, + }) + require.NoError(t, err) + peeker := &mockPeeker{frame} + + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + + transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + var syncWG sync.WaitGroup + + wg.Add(2) + syncWG.Add(2) + + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + err := txMgr.VoteTransaction(ctx, transaction.ID, "primary", vote[:]) + require.NoError(t, err) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + require.NoError(t, txMgr.StopTransaction(ctx, transaction.ID)) + }() + + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + err := txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:]) + require.NoError(t, err) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + err = txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote[:]) + assert.True(t, errors.Is(err, transactions.ErrTransactionStopped)) + }() + + wg.Wait() + + err = streamParams.RequestFinalizer() + require.NoError(t, err) +} + func TestStreamDirectorAccessor(t *testing.T) { gitalySocket := testhelper.GetTemporaryGitalySocketFileName() srv, _ := testhelper.NewServerWithHealth(t, gitalySocket) |