diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-06-09 16:06:03 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-09-29 14:10:12 +0300 |
commit | 1f966f092d836881be57cd9d6983763780f92e86 (patch) | |
tree | 5a4909ae62e4eda838be3eda7b68ae6329b44e99 | |
parent | 1dfb43d5a44fa800b22cb20e7e81bb8cbadeb827 (diff) |
Make repository deletions non-blocking
Before each transaction had a read snapshot, the TransactionManager
had to wait until all transactions were done that begun before the
applying a repository deletion. This was to ensure the reads can
complete successfully before we delete the state they are reading.
As each transaction now have their own read snapshots, they can keep
reading from the snapshot even if the repository is deleted. This
commit thus removes the code synchronizing repository deletions with
older transcations. This increases performance and removes complexity
without causing any externally visible behavior changes.
-rw-r--r-- | internal/gitaly/storage/storagemgr/transaction_manager.go | 65 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/transaction_manager_test.go | 59 |
2 files changed, 18 insertions, 106 deletions
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 54c48145a..f06b75d2b 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -2,7 +2,6 @@ package storagemgr import ( "bytes" - "container/list" "context" "encoding/binary" "errors" @@ -234,8 +233,6 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption txn.snapshot.CustomHookPath = filepath.Join(mgr.repositoryPath, repoutil.CustomHooksDir) } - openTransactionElement := mgr.openTransactions.PushBack(txn) - mgr.snapshotLocks[txn.snapshot.ReadIndex].activeSnapshotters.Add(1) defer mgr.snapshotLocks[txn.snapshot.ReadIndex].activeSnapshotters.Done() readReady := mgr.snapshotLocks[txn.snapshot.ReadIndex].applied @@ -244,10 +241,6 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption txn.finish = func() error { defer close(txn.finished) - mgr.mutex.Lock() - mgr.openTransactions.Remove(openTransactionElement) - mgr.mutex.Unlock() - if txn.stagingDirectory != "" { if err := os.RemoveAll(txn.stagingDirectory); err != nil { return fmt.Errorf("remove staging directory: %w", err) @@ -695,9 +688,6 @@ type TransactionManager struct { // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction - // openTransactions contains all transactions that have been begun but not yet committed or rolled back. - // The transactions are ordered from the oldest to the newest. - openTransactions *list.List // initialized is closed when the manager has been initialized. It's used to block new transactions // from beginning prior to the manager having initialized its runtime state on start up. @@ -755,7 +745,6 @@ func NewTransactionManager( relativePath: relativePath, db: newDatabaseAdapter(db), admissionQueue: make(chan *Transaction), - openTransactions: list.New(), initialized: make(chan struct{}), snapshotLocks: make(map[LogIndex]*snapshotLock), stateDirectory: stateDir, @@ -1697,61 +1686,7 @@ func (mgr *TransactionManager) applyReferenceUpdates(ctx context.Context, update } // applyRepositoryDeletion deletes the repository. -// -// Given how the repositories are laid out in the storage, we currently can't support MVCC for them. -// This is because there is only ever a single instance of a given repository. We have to wait for all -// of the readers to finish before we can delete the repository as otherwise the readers could fail in -// unexpected ways and it would be an isolation violation. Repository deletions thus block before all -// transaction with an older read snapshot are done with the repository. func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, index LogIndex) error { - for { - mgr.mutex.Lock() - oldestElement := mgr.openTransactions.Front() - mgr.mutex.Unlock() - if oldestElement == nil { - // If there are no open transactions, the deletion can proceed as there are - // no readers. - // - // Any new transaction would have the deletion in their snapshot, and are waiting - // for it to be applied prior to beginning. - break - } - - oldestTransaction := oldestElement.Value.(*Transaction) - if oldestTransaction.snapshot.ReadIndex >= index { - // If the oldest transaction is reading at this or later log index, it already has the deletion - // in its snapshot, and is waiting for it to be applied. Proceed with the deletion as there - // are no readers with the pre-deletion state in the snapshot. - break - } - - for { - select { - case <-oldestTransaction.finished: - // The oldest transaction finished. Proceed to check the second oldest open transaction. - case transaction := <-mgr.admissionQueue: - // The oldest transaction could also be waiting to commit. Since the Run goroutine is - // blocked here waiting for the transaction to finish, the write would never be admitted - // for processing, leading to a deadlock. Since the repository was deleted, the only correct - // outcome for the transaction would be to receive a not found error. Admit the transaction, - // and finish it with the correct result so we can unblock the deletion. - transaction.result <- ErrRepositoryNotFound - if err := transaction.finish(); err != nil { - return fmt.Errorf("finish transaction: %w", err) - } - - continue - case <-ctx.Done(): - } - - if err := ctx.Err(); err != nil { - return err - } - - break - } - } - if err := os.RemoveAll(mgr.repositoryPath); err != nil { return fmt.Errorf("remove repository: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go index 2a541b45a..afb14bd54 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go @@ -344,15 +344,6 @@ func TestTransactionManager(t *testing.T) { IncludeObjects []git.ObjectID } - // AsyncDeletion can be used to commit a repository deletion asynchronously. This is necessary in tests - // which test concurrent transactions with repository deletions as deletions are blocking. - type AsyncDeletion struct { - // TransactionID identifies the transaction to async commit a deletion. - TransactionID int - // ExpectedError is the error that is expected to be returned when committing the transaction. - ExpectedError error - } - // RecordInitialReferenceValues calls RecordInitialReferenceValues on a transaction. type RecordInitialReferenceValues struct { // TransactionID identifies the transaction to prepare the reference updates on. @@ -3438,8 +3429,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3464,8 +3456,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3490,8 +3483,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3518,8 +3512,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3703,8 +3698,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, // The concurrent transaction should be able to read the // repository despite the committed deletion. @@ -4063,8 +4059,9 @@ func TestTransactionManager(t *testing.T) { CustomHookIndex: 1, }, }, - AsyncDeletion{ - TransactionID: 2, + Commit{ + TransactionID: 2, + DeleteRepository: true, }, // This transaction was started before the deletion, so it should see the old state regardless // of the repository being deleted. @@ -4425,26 +4422,6 @@ func TestTransactionManager(t *testing.T) { default: t.Fatalf("unexpected error type: %T", expectedErr) } - case AsyncDeletion: - require.Contains(t, openTransactions, step.TransactionID, "test error: transaction committed before beginning it") - - transaction := openTransactions[step.TransactionID] - transaction.DeleteRepository() - - commitErr := make(chan error) - go func() { - commitErr <- transaction.Commit(ctx) - }() - defer func() { - require.NoError(t, <-commitErr, "committing async deletion failed") - }() - - // The transactions generally don't block each other due to MVCC. Repository deletions are not yet managed via MVCC - // and thus block until all other transactions with an older snapshot are finished. In order to test transactions with - // concurrent repository deletions, we have to commit the deletions asynchronously. We peek here at the internals to - // determine that the deletion has actually been admitted, and is waiting for application to ensure the commit order is always - // as expected by the test. - <-transaction.admitted case RecordInitialReferenceValues: require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it") |