Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-07-07 14:37:41 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-07-13 10:38:13 +0300
commitf0b36e1c538b1d04b1b2d74a91337fec72bbb4ff (patch)
tree54580477618bc44cac3b2d8390b54bb8c66e2909
parentd03699fde8716190f757b03fa1ab537a858766fe (diff)
Clean up partition only after all transactions are donesmh-clean-up-after-done
The PartitionManager is cleaning up a TransactionManager's staging directory once it returns. It does so without waiting for transactions to finish. This leads to unexpected errors when the transaction's staging directory is removed before the transaction is done. One example error is when the transaction's own clean up fails due to PartitionManager already removing the transaction's staging directory. Fix this issue by waiting until all transactions using a given TransactionManager are done prior to removing the staging directory. Another TransactionManager can already be spawned for a partition prior to the clean up finishing as the TransactionManager's get unique staging directories and the earlier instance has already exited prior to the clean up. Fixes flaky test TestPartitionManager/transaction_from_previous_- transaction_manager_finalized_after_new_manager_started.
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go29
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager_test.go9
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go10
3 files changed, 41 insertions, 7 deletions
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index f59bd9140..071a61a39 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -94,8 +94,8 @@ func (sm *storageManager) transactionFinalizerFactory(ptn *partition) func() {
// partition contains the transaction manager and tracks the number of in-flight transactions for the partition.
type partition struct {
- // shuttingDown is set when the partition shutdown was initiated due to being idle.
- shuttingDown bool
+ // shuttingDown is closed when the partition has no longer any active transactions.
+ shuttingDown chan struct{}
// shutdown is closed to signal when the partition is finished shutting down. Clients stumbling on the
// partition when it is shutting down wait on this channel to know when the partition has shut down and they
// should retry.
@@ -108,10 +108,20 @@ type partition struct {
// stop stops the partition's transaction manager.
func (ptn *partition) stop() {
- ptn.shuttingDown = true
+ close(ptn.shuttingDown)
ptn.transactionManager.Stop()
}
+// isStopping returns whether partition shutdown has been initiated.
+func (ptn *partition) isStopping() bool {
+ select {
+ case <-ptn.shuttingDown:
+ return true
+ default:
+ return false
+ }
+}
+
// NewPartitionManager returns a new PartitionManager.
func NewPartitionManager(
configuredStorages []config.Storage,
@@ -193,7 +203,8 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository)
ptn, ok := storageMgr.partitions[relativePath]
if !ok {
ptn = &partition{
- shutdown: make(chan struct{}),
+ shuttingDown: make(chan struct{}),
+ shutdown: make(chan struct{}),
}
stagingDir, err := os.MkdirTemp(storageMgr.stagingDirectory, "")
@@ -226,6 +237,14 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository)
close(ptn.shutdown)
+ // If the TransactionManager returned due to an error, it could be that there are still
+ // in-flight transactions operating on their staged state. Removing the staging directory
+ // while they are active can lead to unexpected errors. Wait with the removal until they've
+ // all finished, and only then remove the staging directory.
+ //
+ // All transactions must eventually finish, so we don't wait on a context cancellation here.
+ <-ptn.shuttingDown
+
if err := os.RemoveAll(stagingDir); err != nil {
logger.WithError(err).Error("failed removing partition's staging directory")
}
@@ -234,7 +253,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository)
}()
}
- if ptn.shuttingDown {
+ if ptn.isStopping() {
// If the partition is in the process of shutting down, the partition should not be
// used. The lock is released while waiting for the partition to complete shutdown as to
// not block other partitions from processing transactions. Once shutdown is complete, a
diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go
index bb496d2fd..bde599479 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go
@@ -102,7 +102,9 @@ func TestPartitionManager(t *testing.T) {
for _, sp := range pm.storages {
sp.mu.Lock()
for _, ptn := range sp.partitions {
- if ptn.shuttingDown {
+ // The stoPartition step stops the transaction manager directly without calling stop
+ // on the partition, so we check the manager diretly here as well.
+ if ptn.isStopping() || ptn.transactionManager.isStopping() {
waitFor = append(waitFor, ptn.shutdown)
}
}
@@ -650,7 +652,10 @@ func TestPartitionManager(t *testing.T) {
require.Contains(t, openTransactionData, step.transactionID, "test error: transaction manager stopped before being started")
data := openTransactionData[step.transactionID]
- data.ptn.stop()
+ // Stop the TransactionManager directly. Stopping through the partition would change the
+ // state used to sync which should only be changed when the shutdown is initiated through
+ // the normal means.
+ data.ptn.transactionManager.Stop()
blockOnPartitionShutdown(t, partitionManager)
case finalizeTransaction:
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 08f283315..b35c2813a 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -911,6 +911,16 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
// Stop stops the transaction processing causing Run to return.
func (mgr *TransactionManager) Stop() { mgr.stop() }
+// isStopping returns whether stopping of the manager was initiated.
+func (mgr *TransactionManager) isStopping() bool {
+ select {
+ case <-mgr.stopCalled:
+ return true
+ default:
+ return false
+ }
+}
+
// initialize initializes the TransactionManager's state from the database. It loads the appendend and the applied
// indexes and initializes the notification channels that synchronize transaction beginning with log entry applying.
func (mgr *TransactionManager) initialize(ctx context.Context) error {