Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go29
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager_test.go9
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go10
3 files changed, 41 insertions, 7 deletions
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index f59bd9140..071a61a39 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -94,8 +94,8 @@ func (sm *storageManager) transactionFinalizerFactory(ptn *partition) func() {
// partition contains the transaction manager and tracks the number of in-flight transactions for the partition.
type partition struct {
- // shuttingDown is set when the partition shutdown was initiated due to being idle.
- shuttingDown bool
+ // shuttingDown is closed when the partition has no longer any active transactions.
+ shuttingDown chan struct{}
// shutdown is closed to signal when the partition is finished shutting down. Clients stumbling on the
// partition when it is shutting down wait on this channel to know when the partition has shut down and they
// should retry.
@@ -108,10 +108,20 @@ type partition struct {
// stop stops the partition's transaction manager.
func (ptn *partition) stop() {
- ptn.shuttingDown = true
+ close(ptn.shuttingDown)
ptn.transactionManager.Stop()
}
+// isStopping returns whether partition shutdown has been initiated.
+func (ptn *partition) isStopping() bool {
+ select {
+ case <-ptn.shuttingDown:
+ return true
+ default:
+ return false
+ }
+}
+
// NewPartitionManager returns a new PartitionManager.
func NewPartitionManager(
configuredStorages []config.Storage,
@@ -193,7 +203,8 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository)
ptn, ok := storageMgr.partitions[relativePath]
if !ok {
ptn = &partition{
- shutdown: make(chan struct{}),
+ shuttingDown: make(chan struct{}),
+ shutdown: make(chan struct{}),
}
stagingDir, err := os.MkdirTemp(storageMgr.stagingDirectory, "")
@@ -226,6 +237,14 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository)
close(ptn.shutdown)
+ // If the TransactionManager returned due to an error, it could be that there are still
+ // in-flight transactions operating on their staged state. Removing the staging directory
+ // while they are active can lead to unexpected errors. Wait with the removal until they've
+ // all finished, and only then remove the staging directory.
+ //
+ // All transactions must eventually finish, so we don't wait on a context cancellation here.
+ <-ptn.shuttingDown
+
if err := os.RemoveAll(stagingDir); err != nil {
logger.WithError(err).Error("failed removing partition's staging directory")
}
@@ -234,7 +253,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository)
}()
}
- if ptn.shuttingDown {
+ if ptn.isStopping() {
// If the partition is in the process of shutting down, the partition should not be
// used. The lock is released while waiting for the partition to complete shutdown as to
// not block other partitions from processing transactions. Once shutdown is complete, a
diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go
index bb496d2fd..bde599479 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go
@@ -102,7 +102,9 @@ func TestPartitionManager(t *testing.T) {
for _, sp := range pm.storages {
sp.mu.Lock()
for _, ptn := range sp.partitions {
- if ptn.shuttingDown {
+ // The stoPartition step stops the transaction manager directly without calling stop
+ // on the partition, so we check the manager diretly here as well.
+ if ptn.isStopping() || ptn.transactionManager.isStopping() {
waitFor = append(waitFor, ptn.shutdown)
}
}
@@ -650,7 +652,10 @@ func TestPartitionManager(t *testing.T) {
require.Contains(t, openTransactionData, step.transactionID, "test error: transaction manager stopped before being started")
data := openTransactionData[step.transactionID]
- data.ptn.stop()
+ // Stop the TransactionManager directly. Stopping through the partition would change the
+ // state used to sync which should only be changed when the shutdown is initiated through
+ // the normal means.
+ data.ptn.transactionManager.Stop()
blockOnPartitionShutdown(t, partitionManager)
case finalizeTransaction:
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 08f283315..b35c2813a 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -911,6 +911,16 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
// Stop stops the transaction processing causing Run to return.
func (mgr *TransactionManager) Stop() { mgr.stop() }
+// isStopping returns whether stopping of the manager was initiated.
+func (mgr *TransactionManager) isStopping() bool {
+ select {
+ case <-mgr.stopCalled:
+ return true
+ default:
+ return false
+ }
+}
+
// initialize initializes the TransactionManager's state from the database. It loads the appendend and the applied
// indexes and initializes the notification channels that synchronize transaction beginning with log entry applying.
func (mgr *TransactionManager) initialize(ctx context.Context) error {