diff options
3 files changed, 41 insertions, 7 deletions
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index f59bd9140..071a61a39 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -94,8 +94,8 @@ func (sm *storageManager) transactionFinalizerFactory(ptn *partition) func() { // partition contains the transaction manager and tracks the number of in-flight transactions for the partition. type partition struct { - // shuttingDown is set when the partition shutdown was initiated due to being idle. - shuttingDown bool + // shuttingDown is closed when the partition has no longer any active transactions. + shuttingDown chan struct{} // shutdown is closed to signal when the partition is finished shutting down. Clients stumbling on the // partition when it is shutting down wait on this channel to know when the partition has shut down and they // should retry. @@ -108,10 +108,20 @@ type partition struct { // stop stops the partition's transaction manager. func (ptn *partition) stop() { - ptn.shuttingDown = true + close(ptn.shuttingDown) ptn.transactionManager.Stop() } +// isStopping returns whether partition shutdown has been initiated. +func (ptn *partition) isStopping() bool { + select { + case <-ptn.shuttingDown: + return true + default: + return false + } +} + // NewPartitionManager returns a new PartitionManager. func NewPartitionManager( configuredStorages []config.Storage, @@ -193,7 +203,8 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository) ptn, ok := storageMgr.partitions[relativePath] if !ok { ptn = &partition{ - shutdown: make(chan struct{}), + shuttingDown: make(chan struct{}), + shutdown: make(chan struct{}), } stagingDir, err := os.MkdirTemp(storageMgr.stagingDirectory, "") @@ -226,6 +237,14 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository) close(ptn.shutdown) + // If the TransactionManager returned due to an error, it could be that there are still + // in-flight transactions operating on their staged state. Removing the staging directory + // while they are active can lead to unexpected errors. Wait with the removal until they've + // all finished, and only then remove the staging directory. + // + // All transactions must eventually finish, so we don't wait on a context cancellation here. + <-ptn.shuttingDown + if err := os.RemoveAll(stagingDir); err != nil { logger.WithError(err).Error("failed removing partition's staging directory") } @@ -234,7 +253,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository) }() } - if ptn.shuttingDown { + if ptn.isStopping() { // If the partition is in the process of shutting down, the partition should not be // used. The lock is released while waiting for the partition to complete shutdown as to // not block other partitions from processing transactions. Once shutdown is complete, a diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index bb496d2fd..bde599479 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -102,7 +102,9 @@ func TestPartitionManager(t *testing.T) { for _, sp := range pm.storages { sp.mu.Lock() for _, ptn := range sp.partitions { - if ptn.shuttingDown { + // The stoPartition step stops the transaction manager directly without calling stop + // on the partition, so we check the manager diretly here as well. + if ptn.isStopping() || ptn.transactionManager.isStopping() { waitFor = append(waitFor, ptn.shutdown) } } @@ -650,7 +652,10 @@ func TestPartitionManager(t *testing.T) { require.Contains(t, openTransactionData, step.transactionID, "test error: transaction manager stopped before being started") data := openTransactionData[step.transactionID] - data.ptn.stop() + // Stop the TransactionManager directly. Stopping through the partition would change the + // state used to sync which should only be changed when the shutdown is initiated through + // the normal means. + data.ptn.transactionManager.Stop() blockOnPartitionShutdown(t, partitionManager) case finalizeTransaction: diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 08f283315..b35c2813a 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -911,6 +911,16 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { // Stop stops the transaction processing causing Run to return. func (mgr *TransactionManager) Stop() { mgr.stop() } +// isStopping returns whether stopping of the manager was initiated. +func (mgr *TransactionManager) isStopping() bool { + select { + case <-mgr.stopCalled: + return true + default: + return false + } +} + // initialize initializes the TransactionManager's state from the database. It loads the appendend and the applied // indexes and initializes the notification channels that synchronize transaction beginning with log entry applying. func (mgr *TransactionManager) initialize(ctx context.Context) error { |