Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-04-15 13:53:45 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-05-21 09:10:20 +0300
commit9666ac4daa12bbd7823ae863a65e3f7b13df75b0 (patch)
tree328549b0e447e4ab83a45716816ad5249184bde7 /internal/gitaly/transaction_manager.go
parentcd0f8033c285cb88f3c6c53a5e24ff0294b434ec (diff)
Write-ahead log repository deletions
Repository deletions need to be write-ahead logged as well. This ensures their atomicity and that they can be replicated later as part of the log. Once a repository deletion has been commited, all subsequent Begin and Commit calls will fail with a 'repository not found' error. The repository is logically deleted but not yet physically. The physical deletion needs to wait for open transactions to finish so we don't remove the files they are operating on. For now, it's possible to set other updates in the Transaction even if it ultimately removes the repository. Technically this is fine but it's a bit non-sensical and we don't have a use case for it in Gitaly. We'll probably later improve the interface by splitting out different transaction types so UpdateReferences() can't be called on the same transaction that deletes the repository.
Diffstat (limited to 'internal/gitaly/transaction_manager.go')
-rw-r--r--internal/gitaly/transaction_manager.go195
1 files changed, 174 insertions, 21 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go
index 2b13ef7a1..77f67ee7d 100644
--- a/internal/gitaly/transaction_manager.go
+++ b/internal/gitaly/transaction_manager.go
@@ -24,11 +24,15 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/safe"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
+// ErrRepositoryNotFound is returned when the repository doesn't exist.
+var ErrRepositoryNotFound = structerr.NewNotFound("repository not found")
+
// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions.
var ErrTransactionProcessingStopped = errors.New("transaction processing stopped")
@@ -164,6 +168,7 @@ type Transaction struct {
referenceUpdates ReferenceUpdates
defaultBranchUpdate *DefaultBranchUpdate
customHooksUpdate *CustomHooksUpdate
+ deleteRepository bool
}
// Begin opens a new transaction. The caller must call either Commit or Rollback to release
@@ -181,6 +186,11 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur
}
mgr.mutex.Lock()
+ if !mgr.repositoryExists {
+ mgr.mutex.Unlock()
+ return nil, ErrRepositoryNotFound
+ }
+
txn := &Transaction{
commit: mgr.commit,
finalize: mgr.transactionFinalizer,
@@ -299,6 +309,11 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) {
txn.referenceUpdates = updates
}
+// DeleteRepository deletes the repository when the transaction is committed.
+func (txn *Transaction) DeleteRepository() {
+ txn.deleteRepository = true
+}
+
// QuarantineDirectory returns an absolute path to the transaction's quarantine directory. The quarantine directory
// is a Git object directory where the new objects introduced in the transaction must be written. The quarantined
// objects needed by the updated reference tips will be included in the transaction.
@@ -394,6 +409,9 @@ type TransactionManager struct {
// Gitaly starts.
stagingDirectory string
+ // repositoryExists marks whether the repository exists or not. The repository may not exist if it has
+ // never been created, or if it has been deleted.
+ repositoryExists bool
// repository is the repository this TransactionManager is acting on.
repository repository
// repositoryPath is the path to the repository this TransactionManager is acting on.
@@ -678,6 +696,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
}
if err := func() (commitErr error) {
+ if !mgr.repositoryExists {
+ return ErrRepositoryNotFound
+ }
+
logEntry := &gitalypb.LogEntry{}
var err error
@@ -701,8 +723,8 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR,
}
}
- nextLogIndex := mgr.appendedLogIndex + 1
+ nextLogIndex := mgr.appendedLogIndex + 1
if transaction.includesPack {
logEntry.IncludesPack = true
@@ -725,6 +747,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
}
}
+ if transaction.deleteRepository {
+ logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{}
+ }
+
return mgr.appendLogEntry(nextLogIndex, logEntry)
}(); err != nil {
transaction.result <- err
@@ -744,10 +770,6 @@ func (mgr *TransactionManager) Stop() { mgr.stop() }
func (mgr *TransactionManager) initialize(ctx context.Context) error {
defer close(mgr.initialized)
- if err := mgr.createDirectories(); err != nil {
- return fmt.Errorf("create directories: %w", err)
- }
-
var appliedLogIndex gitalypb.LogIndex
if err := mgr.readKey(keyAppliedLogIndex(mgr.relativePath), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
return fmt.Errorf("read applied log index: %w", err)
@@ -781,6 +803,16 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
return fmt.Errorf("determine appended log index: %w", err)
}
+ if err := mgr.determineRepositoryExistence(); err != nil {
+ return fmt.Errorf("determine repository existence: %w", err)
+ }
+
+ if mgr.repositoryExists {
+ if err := mgr.createDirectories(); err != nil {
+ return fmt.Errorf("create directories: %w", err)
+ }
+ }
+
var err error
mgr.hookIndex, err = mgr.determineHookIndex(ctx, mgr.appendedLogIndex, mgr.appliedLogIndex)
if err != nil {
@@ -800,6 +832,42 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
return nil
}
+// determineRepositoryExistence determines whether the repository exists or not by looking
+// at whether the directory exists and whether there is a deletion request logged.
+func (mgr *TransactionManager) determineRepositoryExistence() error {
+ stat, err := os.Stat(mgr.repositoryPath)
+ if err != nil {
+ if !errors.Is(err, fs.ErrNotExist) {
+ return fmt.Errorf("stat repository directory: %w", err)
+ }
+ }
+
+ if stat != nil {
+ if !stat.IsDir() {
+ return fmt.Errorf("repository's path didn't point to a directory")
+ }
+
+ mgr.repositoryExists = true
+ }
+
+ // Check whether the last log entry is a repository deletion. If so,
+ // the repository has been deleted but the deletion wasn't yet applied.
+ // The deletion is the last entry always as no further writes are
+ // accepted if the repository doesn't exist.
+ if mgr.appliedLogIndex < mgr.appendedLogIndex {
+ logEntry, err := mgr.readLogEntry(mgr.appendedLogIndex)
+ if err != nil {
+ return fmt.Errorf("read log entry: %w", err)
+ }
+
+ if logEntry.RepositoryDeletion != nil {
+ mgr.repositoryExists = false
+ }
+ }
+
+ return nil
+}
+
// determineHookIndex determines the latest hooks in the repository.
//
// 1. First we iterate through the unapplied log in reverse order. The first log entry that
@@ -809,6 +877,11 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
// to see which are the latest.
// 3. If we found no hooks in the log nor in the repository, there are no hooks configured.
func (mgr *TransactionManager) determineHookIndex(ctx context.Context, appendedIndex, appliedIndex LogIndex) (LogIndex, error) {
+ if !mgr.repositoryExists {
+ // If the repository doesn't exist, then there are no hooks either.
+ return 0, nil
+ }
+
for i := appendedIndex; appliedIndex < i; i-- {
logEntry, err := mgr.readLogEntry(i)
if err != nil {
@@ -1083,6 +1156,10 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g
mgr.hookIndex = nextLogIndex
}
mgr.applyNotifications[nextLogIndex] = make(chan struct{})
+ if logEntry.RepositoryDeletion != nil {
+ mgr.repositoryExists = false
+ mgr.hookIndex = 0
+ }
mgr.mutex.Unlock()
return nil
@@ -1095,27 +1172,36 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return fmt.Errorf("read log entry: %w", err)
}
- if logEntry.IncludesPack {
- if err := mgr.applyPackFile(ctx, logIndex); err != nil {
- return fmt.Errorf("apply pack file: %w", err)
+ if logEntry.RepositoryDeletion != nil {
+ // If the repository is being deleted, just delete it without any other changes given
+ // they'd all be removed anyway. Reapplying the other changes after a crash would also
+ // not work if the repository was successfully deleted before the crash.
+ if err := mgr.applyRepositoryDeletion(ctx, logIndex); err != nil {
+ return fmt.Errorf("apply repository deletion: %w", err)
+ }
+ } else {
+ if logEntry.IncludesPack {
+ if err := mgr.applyPackFile(ctx, logIndex); err != nil {
+ return fmt.Errorf("apply pack file: %w", err)
+ }
}
- }
- updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, mgr.repository)
- if err != nil {
- return fmt.Errorf("perpare reference transaction: %w", err)
- }
+ updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, mgr.repository)
+ if err != nil {
+ return fmt.Errorf("prepare reference transaction: %w", err)
+ }
- if err := updater.Commit(); err != nil {
- return fmt.Errorf("commit transaction: %w", err)
- }
+ if err := updater.Commit(); err != nil {
+ return fmt.Errorf("commit transaction: %w", err)
+ }
- if err := mgr.updateDefaultBranch(ctx, logEntry.DefaultBranchUpdate); err != nil {
- return fmt.Errorf("writing default branch: %w", err)
- }
+ if err := mgr.updateDefaultBranch(ctx, logEntry.DefaultBranchUpdate); err != nil {
+ return fmt.Errorf("writing default branch: %w", err)
+ }
- if err := mgr.applyCustomHooks(ctx, logIndex, logEntry.CustomHooksUpdate); err != nil {
- return fmt.Errorf("apply custom hooks: %w", err)
+ if err := mgr.applyCustomHooks(ctx, logIndex, logEntry.CustomHooksUpdate); err != nil {
+ return fmt.Errorf("apply custom hooks: %w", err)
+ }
}
if err := mgr.storeAppliedLogIndex(logIndex); err != nil {
@@ -1150,6 +1236,73 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return nil
}
+// applyRepositoryDeletion deletes the repository.
+//
+// Given how the repositories are laid out in the storage, we currently can't support MVCC for them.
+// This is because there is only ever a single instance of a given repository. We have to wait for all
+// of the readers to finish before we can delete the repository as otherwise the readers could fail in
+// unexpected ways and it would be an isolation violation. Repository deletions thus block before all
+// transaction with an older read snapshot are done with the repository.
+func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, index LogIndex) error {
+ for {
+ mgr.mutex.Lock()
+ oldestElement := mgr.openTransactions.Front()
+ mgr.mutex.Unlock()
+ if oldestElement == nil {
+ // If there are no open transactions, the deletion can proceed as there are
+ // no readers.
+ //
+ // Any new transaction would have the deletion in their snapshot, and are waiting
+ // for it to be applied prior to beginning.
+ break
+ }
+
+ oldestTransaction := oldestElement.Value.(*Transaction)
+ if oldestTransaction.snapshot.ReadIndex >= index {
+ // If the oldest transaction is reading at this or later log index, it already has the deletion
+ // in its snapshot, and is waiting for it to be applied. Proceed with the deletion as there
+ // are no readers with the pre-deletion state in the snapshot.
+ break
+ }
+
+ for {
+ select {
+ case <-oldestTransaction.finished:
+ // The oldest transaction finished. Proceed to check the second oldest open transaction.
+ case transaction := <-mgr.admissionQueue:
+ // The oldest transaction could also be waiting to commit. Since the Run goroutine is
+ // blocked here waiting for the transaction to finish, the write would never be admitted
+ // for processing, leading to a deadlock. Since the repository was deleted, the only correct
+ // outcome for the transaction would be to receive a not found error. Admit the transaction,
+ // and finish it with the correct result so we can unblock the deletion.
+ transaction.result <- ErrRepositoryNotFound
+ if err := transaction.finish(); err != nil {
+ return fmt.Errorf("finish transaction: %w", err)
+ }
+
+ continue
+ case <-ctx.Done():
+ }
+
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+
+ break
+ }
+ }
+
+ if err := os.RemoveAll(mgr.repositoryPath); err != nil {
+ return fmt.Errorf("remove repository: %w", err)
+ }
+
+ if err := safe.NewSyncer().Sync(filepath.Dir(mgr.repositoryPath)); err != nil {
+ return fmt.Errorf("sync: %w", err)
+ }
+
+ return nil
+}
+
// applyPackFile unpacks the objects from the pack file into the repository if the log entry
// has an associated pack file.
func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error {