diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-10-03 07:50:27 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-10-03 07:50:27 +0300 |
commit | 4fc1879574e1d0a4cc4cea3c6a654c011b65c544 (patch) | |
tree | 5ee05be909523f09cc4077961390bd20a05305d9 | |
parent | f9e6780bdf315bf44fb1d9acafb76abe0ae66b73 (diff) | |
parent | 63cedd75726a893cb652f566f24e6430e9e5858c (diff) |
Merge branch 'smh-snapshot-isolate-txn' into 'master'
Snapshot isolate transactions
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6388
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
8 files changed, 513 insertions, 428 deletions
diff --git a/internal/git/housekeeping/worktrees.go b/internal/git/housekeeping/worktrees.go index 9d2c20aec..682fef2b9 100644 --- a/internal/git/housekeeping/worktrees.go +++ b/internal/git/housekeeping/worktrees.go @@ -18,7 +18,10 @@ import ( ) const ( - worktreePrefix = "gitlab-worktree" + // WorktreesPrefix is a subdirectory in the repository where Gitaly creates worktrees. + WorktreesPrefix = "worktrees" + // GitlabWorktreePrefix is a subdirectory in the repository where Gitaly creates worktrees. + GitlabWorktreePrefix = "gitlab-worktree" ) // CleanupWorktrees cleans up stale and disconnected worktrees for the given repository. @@ -45,7 +48,7 @@ func cleanStaleWorktrees(ctx context.Context, repo *localrepo.Repo, threshold ti return err } - worktreePath := filepath.Join(repoPath, worktreePrefix) + worktreePath := filepath.Join(repoPath, GitlabWorktreePrefix) dirInfo, err := os.Stat(worktreePath) if err != nil { @@ -135,7 +138,7 @@ func cleanDisconnectedWorktrees(ctx context.Context, repo *localrepo.Repo) error // determining if there could possibly be any work to be done by git-worktree(1). We do so // by reading the directory in which worktrees are stored, and if it's empty then we know // that there aren't any worktrees in the first place. - worktreeEntries, err := os.ReadDir(filepath.Join(repoPath, "worktrees")) + worktreeEntries, err := os.ReadDir(filepath.Join(repoPath, WorktreesPrefix)) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil diff --git a/internal/git/housekeeping/worktrees_test.go b/internal/git/housekeeping/worktrees_test.go index 6c51f2e2d..d09b0c2b9 100644 --- a/internal/git/housekeeping/worktrees_test.go +++ b/internal/git/housekeeping/worktrees_test.go @@ -63,14 +63,14 @@ func TestRemoveWorktree(t *testing.T) { gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch(git.DefaultBranch)) repo := localrepo.NewTestRepo(t, cfg, repoProto) - existingWorktreePath := filepath.Join(repoPath, worktreePrefix, "existing") + existingWorktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, "existing") gittest.AddWorktree(t, cfg, repoPath, existingWorktreePath) - disconnectedWorktreePath := filepath.Join(repoPath, worktreePrefix, "disconnected") + disconnectedWorktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, "disconnected") gittest.AddWorktree(t, cfg, repoPath, disconnectedWorktreePath) require.NoError(t, os.RemoveAll(disconnectedWorktreePath)) - orphanedWorktreePath := filepath.Join(repoPath, worktreePrefix, "orphaned") + orphanedWorktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, "orphaned") require.NoError(t, os.MkdirAll(orphanedWorktreePath, perm.PublicDir)) for _, tc := range []struct { @@ -100,7 +100,7 @@ func TestRemoveWorktree(t *testing.T) { t.Run(tc.worktree, func(t *testing.T) { ctx := testhelper.Context(t) - worktreePath := filepath.Join(repoPath, worktreePrefix, tc.worktree) + worktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, tc.worktree) err := removeWorktree(ctx, repo, tc.worktree) if tc.errorIs == nil { diff --git a/internal/gitaly/service/operations/apply_patch.go b/internal/gitaly/service/operations/apply_patch.go index 366109b83..5381d90d0 100644 --- a/internal/gitaly/service/operations/apply_patch.go +++ b/internal/gitaly/service/operations/apply_patch.go @@ -10,6 +10,7 @@ import ( "time" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" @@ -19,10 +20,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/streamio" ) -const ( - gitlabWorktreesSubDir = "gitlab-worktree" -) - var errNoDefaultBranch = errors.New("no default branch") type gitError struct { @@ -277,5 +274,5 @@ func (s *Server) removeWorktree(ctx context.Context, repo *gitalypb.Repository, func newWorktreePath(repoPath, prefix string) string { chars := []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") rand.Shuffle(len(chars), func(i, j int) { chars[i], chars[j] = chars[j], chars[i] }) - return filepath.Join(repoPath, gitlabWorktreesSubDir, prefix+string(chars[:32])) + return filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix, prefix+string(chars[:32])) } diff --git a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go index 99fd42ea5..00fa22e08 100644 --- a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go +++ b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -32,10 +33,10 @@ func TestCreateBundleFromRefList_success(t *testing.T) { masterOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithMessage("master"), gittest.WithBranch("master")) sha := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) - require.NoError(t, os.MkdirAll(filepath.Join(repoPath, "gitlab-worktree"), perm.SharedDir)) + require.NoError(t, os.MkdirAll(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix), perm.SharedDir)) - gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", "gitlab-worktree/worktree1", sha.String()) - require.NoError(t, os.Chtimes(filepath.Join(repoPath, "gitlab-worktree", "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour))) + gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", filepath.Join(housekeeping.GitlabWorktreePrefix, "worktree1"), sha.String()) + require.NoError(t, os.Chtimes(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix, "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour))) gittest.Exec(t, cfg, "-C", repoPath, "branch", "-D", "branch") require.NoError(t, os.Remove(filepath.Join(repoPath, "objects", sha.String()[0:2], sha.String()[2:]))) diff --git a/internal/gitaly/service/repository/create_bundle_test.go b/internal/gitaly/service/repository/create_bundle_test.go index 97e5d3cff..aff86c5c2 100644 --- a/internal/gitaly/service/repository/create_bundle_test.go +++ b/internal/gitaly/service/repository/create_bundle_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -30,9 +31,9 @@ func TestCreateBundle_successful(t *testing.T) { // Create a work tree with a HEAD pointing to a commit that is missing. CreateBundle should // clean this up before creating the bundle. worktreeCommit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithMessage("worktree"), gittest.WithBranch("branch")) - require.NoError(t, os.MkdirAll(filepath.Join(repoPath, "gitlab-worktree"), perm.SharedDir)) - gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", "gitlab-worktree/worktree1", worktreeCommit.String()) - require.NoError(t, os.Chtimes(filepath.Join(repoPath, "gitlab-worktree", "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour))) + require.NoError(t, os.MkdirAll(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix), perm.SharedDir)) + gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", filepath.Join(housekeeping.GitlabWorktreePrefix, "worktree1"), worktreeCommit.String()) + require.NoError(t, os.Chtimes(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix, "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour))) gittest.Exec(t, cfg, "-C", repoPath, "branch", "-D", "branch") require.NoError(t, os.Remove(filepath.Join(repoPath, "objects", worktreeCommit.String()[0:2], worktreeCommit.String()[2:]))) diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index 8ec24b4a9..59f2b1b65 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -386,7 +386,7 @@ func TestPartitionManager(t *testing.T) { // Fake a preexisting apply notification. This ensures that we would // block indefinitely waiting for the notifcation and thus allows us to // assert that we can indeed cancel this via the context. - txMgr.applyNotifications[0] = make(chan struct{}) + txMgr.snapshotLocks[0] = &snapshotLock{} return txMgr }, diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 5de67ab82..b4362a44c 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -2,7 +2,6 @@ package storagemgr import ( "bytes" - "container/list" "context" "encoding/binary" "errors" @@ -119,11 +118,6 @@ type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate type Snapshot struct { // ReadIndex is the index of the log entry this Transaction is reading the data at. ReadIndex LogIndex - // CustomHookIndex is index of the custom hooks on the disk that are included in this Transactions's - // snapshot and were the latest on the read index. - CustomHookIndex LogIndex - // CustomHookPath is an absolute filesystem path to the custom hooks in this snapshot. - CustomHookPath string } type transactionState int @@ -149,12 +143,12 @@ type Transaction struct { // result is where the outcome of the transaction is sent ot by TransactionManager once it // has been determined. result chan error - // admitted is closed when the transaction was admitted for processing in the TransactionManager. + // admitted is set when the transaction was admitted for processing in the TransactionManager. // Transaction queues in admissionQueue to be committed, and is considered admitted once it has // been dequeued by TransactionManager.Run(). Once the transaction is admitted, its ownership moves // from the client goroutine to the TransactionManager.Run() goroutine, and the client goroutine must // not do any modifications to the state of the transcation anymore to avoid races. - admitted chan struct{} + admitted bool // finish cleans up the transaction releasing the resources associated with it. It must be called // once the transaction is done with. finish func() error @@ -168,12 +162,15 @@ type Transaction struct { // quarantineDirectory is the directory within the stagingDirectory where the new objects of the // transaction are quarantined. quarantineDirectory string + // snapshotBaseRelativePath is the relative path of the snapshot directory in the storage. + // It's used to rewrite the repositories to point to their snapshots. + snapshotBaseRelativePath string // packPrefix contains the prefix (`pack-<digest>`) of the transaction's pack if the transaction // had objects to log. packPrefix string - // stagingRepository is a repository that is used to stage the transaction. If there are quarantined - // objects, it has the quarantine applied so the objects are available for verification and packing. - stagingRepository *localrepo.Repo + // snapshotRepository is a snapshot of the target repository with a possible quarantine applied + // if this is a read-write transaction. + snapshotRepository *localrepo.Repo // Snapshot contains the details of the Transaction's read snapshot. snapshot Snapshot @@ -216,39 +213,18 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption txn := &Transaction{ readOnly: opts.ReadOnly, commit: mgr.commit, - snapshot: Snapshot{ - ReadIndex: mgr.appendedLogIndex, - CustomHookIndex: mgr.customHookIndex, - CustomHookPath: customHookPathForLogIndex(mgr.repositoryPath, mgr.customHookIndex), - }, - admitted: make(chan struct{}), + snapshot: Snapshot{ReadIndex: mgr.appendedLogIndex}, finished: make(chan struct{}), } - // If there are no custom hooks stored through the WAL yet, then default to the custom hooks - // that may already exist in the repository for backwards compatibility. - if txn.snapshot.CustomHookIndex == 0 { - txn.snapshot.CustomHookPath = filepath.Join(mgr.repositoryPath, repoutil.CustomHooksDir) - } - - openTransactionElement := mgr.openTransactions.PushBack(txn) - - readReady := mgr.applyNotifications[txn.snapshot.ReadIndex] + mgr.snapshotLocks[txn.snapshot.ReadIndex].activeSnapshotters.Add(1) + defer mgr.snapshotLocks[txn.snapshot.ReadIndex].activeSnapshotters.Done() + readReady := mgr.snapshotLocks[txn.snapshot.ReadIndex].applied mgr.mutex.Unlock() - if readReady == nil { - // The snapshot log entry is already applied if there is no notification channel for it. - // If so, the transaction is ready to begin immediately. - readReady = make(chan struct{}) - close(readReady) - } txn.finish = func() error { defer close(txn.finished) - mgr.mutex.Lock() - mgr.openTransactions.Remove(openTransactionElement) - mgr.mutex.Unlock() - if txn.stagingDirectory != "" { if err := os.RemoveAll(txn.stagingDirectory); err != nil { return fmt.Errorf("remove staging directory: %w", err) @@ -278,14 +254,27 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption return nil, fmt.Errorf("mkdir temp: %w", err) } - txn.stagingRepository = mgr.repositoryFactory.Build(mgr.relativePath) + if txn.snapshotBaseRelativePath, err = filepath.Rel( + mgr.storagePath, + filepath.Join(txn.stagingDirectory, "snapshot"), + ); err != nil { + return nil, fmt.Errorf("snapshot root relative path: %w", err) + } + + if err := mgr.createRepositorySnapshot(ctx, + filepath.Join(mgr.storagePath, txn.snapshotRelativePath(mgr.relativePath)), + ); err != nil { + return nil, fmt.Errorf("create snapshot: %w", err) + } + + txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshotRelativePath(mgr.relativePath)) if !txn.readOnly { txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine") if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), perm.PrivateDir); err != nil { return nil, fmt.Errorf("create quarantine directory: %w", err) } - txn.stagingRepository, err = txn.stagingRepository.Quarantine(txn.quarantineDirectory) + txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(txn.quarantineDirectory) if err != nil { return nil, fmt.Errorf("quarantine: %w", err) } @@ -295,16 +284,99 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption } } -// RewriteRepository returns a copy of the repository that has been set up to correctly access the -// transaction's repository. +// snapshotRelativePath returns a rewritten relative path that points to the passed in relative path's +// location in the snapshot. +func (txn *Transaction) snapshotRelativePath(relativePath string) string { + return filepath.Join(txn.snapshotBaseRelativePath, relativePath) +} + +// RewriteRepository returns a copy of the repository that has been set up to correctly access +// the repository in the transaction's snapshot. func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.Repository { rewritten := proto.Clone(repo).(*gitalypb.Repository) - rewritten.RelativePath = txn.stagingRepository.GetRelativePath() - rewritten.GitObjectDirectory = txn.stagingRepository.GetGitObjectDirectory() - rewritten.GitAlternateObjectDirectories = txn.stagingRepository.GetGitAlternateObjectDirectories() + rewritten.RelativePath = txn.snapshotRelativePath(repo.RelativePath) + rewritten.GitObjectDirectory = txn.snapshotRepository.GetGitObjectDirectory() + rewritten.GitAlternateObjectDirectories = txn.snapshotRepository.GetGitAlternateObjectDirectories() return rewritten } +// createRepositorySnapshot snapshots the repository's current state at snapshotPath. This is done by +// recreating the repository's directory structure and hard linking the repository's files in their +// correct locations there. This effectively does a copy-free clone of the repository. Since the files +// are shared between the snapshot and the repository, they must not be modified. Git doesn't modify +// existing files but writes new ones so this property is upheld. +func (mgr *TransactionManager) createRepositorySnapshot(ctx context.Context, snapshotPath string) error { + // This creates the parent directory hierarchy regardless of whether the repository exists or not. It also + // doesn't consider the permissions in the storage. While not 100% correct, we have no logic that cares about + // the storage hierarchy above repositories. + // + // The repository's directory itself is not yet created as whether it should be created depends on whether the + // repository exists or not. + if err := os.MkdirAll(filepath.Dir(snapshotPath), perm.PrivateDir); err != nil { + return fmt.Errorf("create parent directory hierarchy: %w", err) + } + + mgr.stateLock.RLock() + defer mgr.stateLock.RUnlock() + if err := createDirectorySnapshot(ctx, mgr.repositoryPath, snapshotPath, map[string]struct{}{ + // Don't include worktrees in the snapshot. All of the worktrees in the repository should be leftover + // state from before transaction management was introduced as the transactions would create their + // worktrees in the snapshot. + housekeeping.WorktreesPrefix: {}, + housekeeping.GitlabWorktreePrefix: {}, + }); err != nil { + return fmt.Errorf("create directory snapshot: %w", err) + } + + return nil +} + +// createDirectorySnapshot recursively recreates the directory structure from originalDirectory into +// snapshotDirectory and hard links files into the same locations in snapshotDirectory. +// +// skipRelativePaths can be provided to skip certain entries from being included in the snapshot. +func createDirectorySnapshot(ctx context.Context, originalDirectory, snapshotDirectory string, skipRelativePaths map[string]struct{}) error { + if err := filepath.Walk(originalDirectory, func(oldPath string, info fs.FileInfo, err error) error { + if err != nil { + if errors.Is(err, fs.ErrNotExist) && oldPath == originalDirectory { + // The directory being snapshotted does not exist. This is fine as the transaction + // may be about to create it. + return nil + } + + return err + } + + relativePath, err := filepath.Rel(originalDirectory, oldPath) + if err != nil { + return fmt.Errorf("rel: %w", err) + } + + if _, ok := skipRelativePaths[relativePath]; ok { + return fs.SkipDir + } + + newPath := filepath.Join(snapshotDirectory, relativePath) + if info.IsDir() { + if err := os.Mkdir(newPath, info.Mode().Perm()); err != nil { + return fmt.Errorf("create dir: %w", err) + } + } else if info.Mode().IsRegular() { + if err := os.Link(oldPath, newPath); err != nil { + return fmt.Errorf("link file: %w", err) + } + } else { + return fmt.Errorf("unsupported file mode: %q", info.Mode()) + } + + return nil + }); err != nil { + return fmt.Errorf("walk: %w", err) + } + + return nil +} + func (txn *Transaction) updateState(newState transactionState) error { switch txn.state { case transactionStateOpen: @@ -377,12 +449,11 @@ func (txn *Transaction) Rollback() error { // the Transaction is being processed by TransactionManager. The clean up responsibility moves there as well // to avoid races. func (txn *Transaction) finishUnadmitted() error { - select { - case <-txn.admitted: + if txn.admitted { return nil - default: - return txn.finish() } + + return txn.finish() } // Snapshot returns the details of the Transaction's read snapshot. @@ -419,7 +490,7 @@ func (txn *Transaction) RecordInitialReferenceValues(ctx context.Context, initia continue } - objectHash, err := txn.stagingRepository.ObjectHash(ctx) + objectHash, err := txn.snapshotRepository.ObjectHash(ctx) if err != nil { return fmt.Errorf("object hash: %w", err) } @@ -427,7 +498,7 @@ func (txn *Transaction) RecordInitialReferenceValues(ctx context.Context, initia if objectHash.IsZeroOID(oid) { // If this is a zero OID, resolve the value to see if this is a force update or the // reference doesn't exist. - if current, err := txn.stagingRepository.ResolveRevision(ctx, reference.Revision()); err != nil { + if current, err := txn.snapshotRepository.ResolveRevision(ctx, reference.Revision()); err != nil { if !errors.Is(err, git.ErrReferenceNotFound) { return fmt.Errorf("resolve revision: %w", err) } @@ -507,6 +578,20 @@ func (txn *Transaction) walFilesPath() string { return filepath.Join(txn.stagingDirectory, "wal-files") } +// snapshotLock contains state used to synchronize snapshotters and the log application with each other. +// Snapshotters wait on the applied channel until all of the committed writes in the read snapshot have +// been applied on the repository. The log application waits until all activeSnapshotters have managed to +// snapshot their state prior to applying the next log entry to the repository. +type snapshotLock struct { + // applied is closed when the transaction the snapshotters are waiting for has been applied to the + // repository and is ready for reading. + applied chan struct{} + // activeSnapshotters tracks snapshotters who are either taking a snapshot or waiting for the + // log entry to be applied. Log application waits for active snapshotters to finish before applying + // the next entry. + activeSnapshotters sync.WaitGroup +} + // TransactionManager is responsible for transaction management of a single repository. Each repository has // a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from // the admissionQueue. Each admitted write is processed in three steps: @@ -576,6 +661,9 @@ type TransactionManager struct { repository *localrepo.Repo // repositoryPath is the path to the repository this TransactionManager is acting on. repositoryPath string + // storagePath is an absolute path to the root of the storage this TransactionManager + // is operating in. + storagePath string // relativePath is the repository's relative path inside the storage. relativePath string // db is the handle to the key-value store used for storing the write-ahead log related state. @@ -583,9 +671,6 @@ type TransactionManager struct { // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction - // openTransactions contains all transactions that have been begun but not yet committed or rolled back. - // The transactions are ordered from the oldest to the newest. - openTransactions *list.List // initialized is closed when the manager has been initialized. It's used to block new transactions // from beginning prior to the manager having initialized its runtime state on start up. @@ -593,18 +678,20 @@ type TransactionManager struct { // initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't, // transactions will fail to begin. initializationSuccessful bool - // mutex guards access to applyNotifications and appendedLogIndex. These fields are accessed by both + // mutex guards access to snapshotLocks and appendedLogIndex. These fields are accessed by both // Run and Begin which are ran in different goroutines. mutex sync.Mutex - // applyNotifications stores channels that are closed when a log entry is applied. These - // are used to block transactions from beginning before their snapshot is ready. - applyNotifications map[LogIndex]chan struct{} + + // stateLock is used to synchronize snapshotting with reference verification as the verification + // process targets the main repository and creates locks in it. + stateLock sync.RWMutex + // snapshotLocks contains state used for synchronizing snapshotters with the log application. + snapshotLocks map[LogIndex]*snapshotLock + // appendedLogIndex holds the index of the last log entry appended to the log. appendedLogIndex LogIndex // appliedLogIndex holds the index of the last log entry applied to the repository appliedLogIndex LogIndex - // customHookIndex stores the log index of the latest committed custom custom hooks in the repository. - customHookIndex LogIndex // housekeepingManager access to the housekeeping.Manager. housekeepingManager housekeeping.Manager @@ -634,13 +721,13 @@ func NewTransactionManager( commandFactory: cmdFactory, repositoryFactory: repositoryFactory, repository: repositoryFactory.Build(relativePath), + storagePath: storagePath, repositoryPath: filepath.Join(storagePath, relativePath), relativePath: relativePath, db: newDatabaseAdapter(db), admissionQueue: make(chan *Transaction), - openTransactions: list.New(), initialized: make(chan struct{}), - applyNotifications: make(map[LogIndex]chan struct{}), + snapshotLocks: make(map[LogIndex]*snapshotLock), stateDirectory: stateDir, stagingDirectory: stagingDir, housekeepingManager: housekeepingManager, @@ -666,7 +753,7 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact select { case mgr.admissionQueue <- transaction: - close(transaction.admitted) + transaction.admitted = true select { case err := <-transaction.result: @@ -751,7 +838,18 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return nil } - objectHash, err := transaction.stagingRepository.ObjectHash(ctx) + // We should actually be figuring out the new objects to pack against the transaction's snapshot + // repository as the objects and references in the actual repository may change while we're doing + // this. We don't yet have a way to figure out which objects the new quarantined objects depend on + // in the main object database of the snapshot. + // + // This is pending https://gitlab.com/groups/gitlab-org/-/epics/11242. + quarantinedRepo, err := mgr.repository.Quarantine(transaction.quarantineDirectory) + if err != nil { + return fmt.Errorf("quarantine: %w", err) + } + + objectHash, err := quarantinedRepo.ObjectHash(ctx) if err != nil { return fmt.Errorf("object hash: %w", err) } @@ -781,7 +879,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra group.Go(func() (returnedErr error) { defer func() { objectsWriter.CloseWithError(returnedErr) }() - if err := transaction.stagingRepository.WalkUnreachableObjects(ctx, + if err := quarantinedRepo.WalkUnreachableObjects(ctx, strings.NewReader(strings.Join(heads, "\n")), objectsWriter, ); err != nil { @@ -798,7 +896,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra packWriter.CloseWithError(returnedErr) }() - if err := transaction.stagingRepository.PackObjects(ctx, objectsReader, packWriter); err != nil { + if err := quarantinedRepo.PackObjects(ctx, objectsReader, packWriter); err != nil { return fmt.Errorf("pack objects: %w", err) } @@ -815,7 +913,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra // index-pack places the pack, index, and reverse index into the repository's object directory. // The staging repository is configured with a quarantine so we execute it there. var stdout, stderr bytes.Buffer - if err := transaction.stagingRepository.ExecAndWait(ctx, git.Command{ + if err := quarantinedRepo.ExecAndWait(ctx, git.Command{ Name: "index-pack", Flags: []git.Option{git.Flag{Name: "--stdin"}, git.Flag{Name: "--rev-index"}}, Args: []string{filepath.Join(transaction.walFilesPath(), "objects.pack")}, @@ -1052,16 +1150,15 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { } } - var err error - mgr.customHookIndex, err = mgr.determineCustomHookIndex(ctx, mgr.appendedLogIndex, mgr.appliedLogIndex) - if err != nil { - return fmt.Errorf("determine hook index: %w", err) - } + // Create a snapshot lock for the applied index as it is used for synchronizing + // the snapshotters with the log application. + mgr.snapshotLocks[mgr.appliedLogIndex] = &snapshotLock{applied: make(chan struct{})} + close(mgr.snapshotLocks[mgr.appliedLogIndex].applied) - // Each unapplied log entry should have a notification channel that gets closed when it is applied. - // Create these channels here for the log entries. + // Each unapplied log entry should have a snapshot lock as they are created in normal + // operation when committing a log entry. Recover these entries. for i := mgr.appliedLogIndex + 1; i <= mgr.appendedLogIndex; i++ { - mgr.applyNotifications[i] = make(chan struct{}) + mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} } if err := mgr.removeStaleWALFiles(mgr.ctx, mgr.appendedLogIndex); err != nil { @@ -1109,57 +1206,10 @@ func (mgr *TransactionManager) determineRepositoryExistence() error { return nil } -// determineCustomHookIndex determines the latest custom hooks in the repository. -// -// 1. First we iterate through the unapplied log in reverse order. The first log entry that -// contains custom hooks must have the latest custom hooks since it is the latest log entry. -// 2. If we don't find any custom hooks in the log, the latest hooks could have been applied -// to the repository already and the log entry pruned away. Look at the custom hooks on the -// disk to see which are the latest. -// 3. If we found no custom hooks in the log nor in the repository, there are no custom hooks -// configured. -func (mgr *TransactionManager) determineCustomHookIndex(ctx context.Context, appendedIndex, appliedIndex LogIndex) (LogIndex, error) { - if !mgr.repositoryExists { - // If the repository doesn't exist, then there are no hooks either. - return 0, nil - } - - for i := appendedIndex; appliedIndex < i; i-- { - logEntry, err := mgr.readLogEntry(i) - if err != nil { - return 0, fmt.Errorf("read log entry: %w", err) - } - - if logEntry.CustomHooksUpdate != nil { - return i, nil - } - } - - hookDirs, err := os.ReadDir(filepath.Join(mgr.stateDirectory, "hooks")) - if err != nil { - return 0, fmt.Errorf("read hook directories: %w", err) - } - - var hookIndex LogIndex - for _, dir := range hookDirs { - rawIndex, err := strconv.ParseUint(dir.Name(), 10, 64) - if err != nil { - return 0, fmt.Errorf("parse hook index: %w", err) - } - - if index := LogIndex(rawIndex); hookIndex < index { - hookIndex = index - } - } - - return hookIndex, err -} - func (mgr *TransactionManager) createStateDirectory() error { for _, path := range []string{ mgr.stateDirectory, filepath.Join(mgr.stateDirectory, "wal"), - filepath.Join(mgr.stateDirectory, "hooks"), } { if err := os.Mkdir(path, perm.PrivateDir); err != nil { if !errors.Is(err, fs.ErrExist) { @@ -1276,15 +1326,20 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction return nil, nil } + quarantinedRepo, err := mgr.repository.Quarantine(transaction.quarantineDirectory) + if err != nil { + return nil, fmt.Errorf("quarantine: %w", err) + } + var referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate for referenceName, update := range transaction.referenceUpdates { if err := git.ValidateReference(string(referenceName)); err != nil { return nil, InvalidReferenceFormatError{ReferenceName: referenceName} } - actualOldTip, err := transaction.stagingRepository.ResolveRevision(ctx, referenceName.Revision()) + actualOldTip, err := quarantinedRepo.ResolveRevision(ctx, referenceName.Revision()) if errors.Is(err, git.ErrReferenceNotFound) { - objectHash, err := transaction.stagingRepository.ObjectHash(ctx) + objectHash, err := quarantinedRepo.ObjectHash(ctx) if err != nil { return nil, fmt.Errorf("object hash: %w", err) } @@ -1320,7 +1375,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction ) == -1 }) - if err := mgr.verifyReferencesWithGit(ctx, referenceUpdates, transaction.stagingRepository); err != nil { + if err := mgr.verifyReferencesWithGit(ctx, referenceUpdates, quarantinedRepo); err != nil { return nil, fmt.Errorf("verify references with git: %w", err) } @@ -1331,8 +1386,17 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // the updates will go through when they are being applied in the log. This also catches any invalid reference names // and file/directory conflicts with Git's loose reference storage which can occur with references like // 'refs/heads/parent' and 'refs/heads/parent/child'. -func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, stagingRepository *localrepo.Repo) error { - updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, stagingRepository) +func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, quarantinedRepo *localrepo.Repo) error { + // Prevent snapshots from being taken concurrently as the reference verification + // currently creates locks in the repository. We don't need to synchronnize snapshotting + // and reference verification once we are no longer verifying the references against + // the repository. + // + // Alternatively this could be removed by ignoring the lock files during snapshot creation. + mgr.stateLock.Lock() + defer mgr.stateLock.Unlock() + + updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, quarantinedRepo) if err != nil { return fmt.Errorf("prepare reference transaction: %w", err) } @@ -1451,13 +1515,9 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g mgr.mutex.Lock() mgr.appendedLogIndex = nextLogIndex - if logEntry.CustomHooksUpdate != nil { - mgr.customHookIndex = nextLogIndex - } - mgr.applyNotifications[nextLogIndex] = make(chan struct{}) + mgr.snapshotLocks[nextLogIndex] = &snapshotLock{applied: make(chan struct{})} if logEntry.RepositoryDeletion != nil { mgr.repositoryExists = false - mgr.customHookIndex = 0 } mgr.mutex.Unlock() @@ -1471,6 +1531,15 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return fmt.Errorf("read log entry: %w", err) } + // Ensure all snapshotters have finished snapshotting the previous state before we apply + // the new state to the repository. No new snapshotters can arrive at this point. All + // new transactions would be waiting for the committed log entry we are about to apply. + previousIndex := logIndex - 1 + mgr.snapshotLocks[previousIndex].activeSnapshotters.Wait() + mgr.mutex.Lock() + delete(mgr.snapshotLocks, previousIndex) + mgr.mutex.Unlock() + if logEntry.RepositoryDeletion != nil { // If the repository is being deleted, just delete it without any other changes given // they'd all be removed anyway. Reapplying the other changes after a crash would also @@ -1508,18 +1577,6 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn mgr.appliedLogIndex = logIndex - // Notify the transactions waiting for this log entry to be applied prior to beginning. - mgr.mutex.Lock() - defer mgr.mutex.Unlock() - - notificationCh, ok := mgr.applyNotifications[logIndex] - if !ok { - // This should never happen and is a programming error if it does. - return fmt.Errorf("no notification channel for LSN %d", logIndex) - } - delete(mgr.applyNotifications, logIndex) - close(notificationCh) - // There is no awaiter for a transaction if the transaction manager is recovering // transactions from the log after starting up. if resultChan, ok := mgr.awaitingTransactions[logIndex]; ok { @@ -1527,6 +1584,10 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn delete(mgr.awaitingTransactions, logIndex) } + // Notify the transactions waiting for this log entry to be applied prior to take their + // snapshot. + close(mgr.snapshotLocks[logIndex].applied) + return nil } @@ -1549,61 +1610,7 @@ func (mgr *TransactionManager) applyReferenceUpdates(ctx context.Context, update } // applyRepositoryDeletion deletes the repository. -// -// Given how the repositories are laid out in the storage, we currently can't support MVCC for them. -// This is because there is only ever a single instance of a given repository. We have to wait for all -// of the readers to finish before we can delete the repository as otherwise the readers could fail in -// unexpected ways and it would be an isolation violation. Repository deletions thus block before all -// transaction with an older read snapshot are done with the repository. func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, index LogIndex) error { - for { - mgr.mutex.Lock() - oldestElement := mgr.openTransactions.Front() - mgr.mutex.Unlock() - if oldestElement == nil { - // If there are no open transactions, the deletion can proceed as there are - // no readers. - // - // Any new transaction would have the deletion in their snapshot, and are waiting - // for it to be applied prior to beginning. - break - } - - oldestTransaction := oldestElement.Value.(*Transaction) - if oldestTransaction.snapshot.ReadIndex >= index { - // If the oldest transaction is reading at this or later log index, it already has the deletion - // in its snapshot, and is waiting for it to be applied. Proceed with the deletion as there - // are no readers with the pre-deletion state in the snapshot. - break - } - - for { - select { - case <-oldestTransaction.finished: - // The oldest transaction finished. Proceed to check the second oldest open transaction. - case transaction := <-mgr.admissionQueue: - // The oldest transaction could also be waiting to commit. Since the Run goroutine is - // blocked here waiting for the transaction to finish, the write would never be admitted - // for processing, leading to a deadlock. Since the repository was deleted, the only correct - // outcome for the transaction would be to receive a not found error. Admit the transaction, - // and finish it with the correct result so we can unblock the deletion. - transaction.result <- ErrRepositoryNotFound - if err := transaction.finish(); err != nil { - return fmt.Errorf("finish transaction: %w", err) - } - - continue - case <-ctx.Done(): - } - - if err := ctx.Err(); err != nil { - return err - } - - break - } - } - if err := os.RemoveAll(mgr.repositoryPath); err != nil { return fmt.Errorf("remove repository: %w", err) } @@ -1646,87 +1653,41 @@ func (mgr *TransactionManager) applyPackFile(ctx context.Context, packPrefix str return nil } -// applyCustomHooks applies the custom hooks to the repository from the log entry. The custom hooks are stored -// at `<repo>/wal/hooks/<log_index>`. The custom hooks are fsynced prior to returning so it is safe to delete -// the log entry afterwards. -// -// The hooks are also extracted at `<repo>/custom_hooks`. This is done for backwards compatibility, as we want -// the hooks to be present even if the WAL logic is disabled. This ensures we don't lose data if we have to -// disable the WAL logic after rollout. +// applyCustomHooks applies the custom hooks to the repository from the log entry. The hooks are extracted at +// `<repo>/custom_hooks`. The custom hooks are fsynced prior to returning so it is safe to delete the log entry +// afterwards. func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex LogIndex, update *gitalypb.LogEntry_CustomHooksUpdate) error { if update == nil { return nil } - targetDirectory := customHookPathForLogIndex(mgr.stateDirectory, logIndex) - if err := os.Mkdir(targetDirectory, fs.ModePerm); err != nil { - // The target directory may exist if we previously tried to extract the - // custom hooks there. TAR overwrites existing files and the custom hooks - // files are guaranteed to be the same as this is the same log entry. - if !errors.Is(err, fs.ErrExist) { - return fmt.Errorf("create directory: %w", err) - } + destinationDir := filepath.Join(mgr.repositoryPath, repoutil.CustomHooksDir) + if err := os.RemoveAll(destinationDir); err != nil { + return fmt.Errorf("remove directory: %w", err) } - syncer := safe.NewSyncer() - extractHooks := func(destinationDir string) error { - if err := repoutil.ExtractHooks(ctx, bytes.NewReader(update.CustomHooksTar), destinationDir, true); err != nil { - return fmt.Errorf("extract hooks: %w", err) - } - - // TAR doesn't sync the extracted files so do it manually here. - if err := syncer.SyncRecursive(destinationDir); err != nil { - return fmt.Errorf("sync hooks: %w", err) - } - - return nil + if err := os.Mkdir(destinationDir, perm.PrivateDir); err != nil { + return fmt.Errorf("create directory: %w", err) } - if err := extractHooks(targetDirectory); err != nil { + if err := repoutil.ExtractHooks(ctx, bytes.NewReader(update.CustomHooksTar), destinationDir, true); err != nil { return fmt.Errorf("extract hooks: %w", err) } - // Sync the parent directory as well. - if err := syncer.SyncParent(targetDirectory); err != nil { - return fmt.Errorf("sync hook directory: %w", err) - } - - // Extract another copy that we can move to `<repo>/custom_hooks` where the hooks exist without the WAL enabled. - // We make a second copy as if we disable the WAL, we have to clear all of its state prior to re-enabling it. - // This would clear the hooks so symbolic linking the first copy is not enough. - tmpDir, err := os.MkdirTemp(mgr.stagingDirectory, "") - if err != nil { - return fmt.Errorf("create temporary directory: %w", err) - } - - if err := extractHooks(tmpDir); err != nil { - return fmt.Errorf("extract legacy hooks: %w", err) - } - - legacyHooksPath := filepath.Join(mgr.repositoryPath, repoutil.CustomHooksDir) - // The hooks are lost if we perform this removal but fail to perform the remaining operations and the - // WAL is disabled before succeeding. This is an existing issue already with SetCustomHooks RPC. - if err := os.RemoveAll(legacyHooksPath); err != nil { - return fmt.Errorf("remove existing legacy hooks: %w", err) - } - - if err := os.Rename(tmpDir, legacyHooksPath); err != nil { - return fmt.Errorf("move legacy hooks in place: %w", err) + // TAR doesn't sync the extracted files so do it manually here. + syncer := safe.NewSyncer() + if err := syncer.SyncRecursive(destinationDir); err != nil { + return fmt.Errorf("sync hooks: %w", err) } - if err := syncer.SyncParent(legacyHooksPath); err != nil { - return fmt.Errorf("sync legacy hooks directory entry: %w", err) + // Sync the parent directory as well. + if err := syncer.SyncParent(destinationDir); err != nil { + return fmt.Errorf("sync hook directory: %w", err) } return nil } -// customHookPathForLogIndex returns the filesystem paths where the custom hooks -// for the given log index are stored. -func customHookPathForLogIndex(stateDir string, logIndex LogIndex) string { - return filepath.Join(stateDir, "hooks", logIndex.String()) -} - // deleteLogEntry deletes the log entry at the given index from the log. func (mgr *TransactionManager) deleteLogEntry(index LogIndex) error { return mgr.deleteKey(keyLogEntry(mgr.relativePath, index)) diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go index 7575861c6..94debf74a 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "sort" + "strconv" "strings" "sync" "testing" @@ -25,7 +26,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" @@ -343,15 +343,6 @@ func TestTransactionManager(t *testing.T) { IncludeObjects []git.ObjectID } - // AsyncDeletion can be used to commit a repository deletion asynchronously. This is necessary in tests - // which test concurrent transactions with repository deletions as deletions are blocking. - type AsyncDeletion struct { - // TransactionID identifies the transaction to async commit a deletion. - TransactionID int - // ExpectedError is the error that is expected to be returned when committing the transaction. - ExpectedError error - } - // RecordInitialReferenceValues calls RecordInitialReferenceValues on a transaction. type RecordInitialReferenceValues struct { // TransactionID identifies the transaction to prepare the reference updates on. @@ -1722,8 +1713,7 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, ExpectedSnapshot: Snapshot{ - ReadIndex: 1, - CustomHookIndex: 1, + ReadIndex: 1, }, }, Commit{ @@ -1736,17 +1726,8 @@ func TestTransactionManager(t *testing.T) { string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(), }, Directory: testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/hooks/1/pre-receive": { - Mode: umask.Mask(fs.ModePerm), - Content: []byte("hook content"), - }, - "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, - "/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, }, Repositories: RepositoryStates{ relativePath: { @@ -1806,16 +1787,8 @@ func TestTransactionManager(t *testing.T) { string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), }, Directory: testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/hooks/1/pre-receive": { - Mode: umask.Mask(fs.ModePerm), - Content: []byte("hook content"), - }, - "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, }, Repositories: RepositoryStates{ relativePath: { @@ -1860,8 +1833,7 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, ExpectedSnapshot: Snapshot{ - ReadIndex: 1, - CustomHookIndex: 1, + ReadIndex: 1, }, }, Commit{ @@ -1871,8 +1843,7 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 3, ExpectedSnapshot: Snapshot{ - ReadIndex: 2, - CustomHookIndex: 2, + ReadIndex: 2, }, }, CloseManager{}, @@ -1880,8 +1851,7 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 4, ExpectedSnapshot: Snapshot{ - ReadIndex: 2, - CustomHookIndex: 2, + ReadIndex: 2, }, }, Rollback{ @@ -1893,17 +1863,8 @@ func TestTransactionManager(t *testing.T) { string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(), }, Directory: testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/hooks/1/pre-receive": { - Mode: umask.Mask(fs.ModePerm), - Content: []byte("hook content"), - }, - "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, - "/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, }, Repositories: RepositoryStates{ relativePath: { @@ -2638,16 +2599,13 @@ func TestTransactionManager(t *testing.T) { CustomHooksTAR: validCustomHooks(t), }, }, - // Transaction 2 is not isolated from the changes made by transaction 1. It sees the committed - // changes immediately. + // Transaction 2 is isolated from the changes made by transaction 1. It does not see the + // committed changes. RepositoryAssertion{ TransactionID: 2, Repositories: RepositoryStates{ relativePath: { DefaultBranch: "refs/heads/main", - References: []git.Reference{ - {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, - }, Objects: []git.ObjectID{ setup.ObjectHash.EmptyTreeOID, setup.Commits.First.OID, @@ -2655,23 +2613,13 @@ func TestTransactionManager(t *testing.T) { setup.Commits.Third.OID, setup.Commits.Diverging.OID, }, - CustomHooks: testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/pre-receive": { - Mode: umask.Mask(fs.ModePerm), - Content: []byte("hook content"), - }, - "/private-dir": {Mode: fs.ModeDir | perm.PrivateDir}, - "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, - }, }, }, }, Begin{ TransactionID: 3, ExpectedSnapshot: Snapshot{ - ReadIndex: 1, - CustomHookIndex: 1, + ReadIndex: 1, }, }, // Transaction 3 is should see the new changes as it began after transaction 1 was committed. @@ -2711,8 +2659,7 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 4, ExpectedSnapshot: Snapshot{ - ReadIndex: 2, - CustomHookIndex: 1, + ReadIndex: 2, }, }, Rollback{ @@ -2721,8 +2668,7 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 5, ExpectedSnapshot: Snapshot{ - ReadIndex: 2, - CustomHookIndex: 1, + ReadIndex: 2, }, }, Commit{ @@ -2735,8 +2681,7 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 6, ExpectedSnapshot: Snapshot{ - ReadIndex: 3, - CustomHookIndex: 3, + ReadIndex: 3, }, }, Rollback{ @@ -2751,17 +2696,8 @@ func TestTransactionManager(t *testing.T) { string(keyAppliedLogIndex(relativePath)): LogIndex(3).toProto(), }, Directory: testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/hooks/1/pre-receive": { - Mode: umask.Mask(fs.ModePerm), - Content: []byte("hook content"), - }, - "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, - "/hooks/3": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, }, Repositories: RepositoryStates{ relativePath: { @@ -2811,7 +2747,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -2914,7 +2849,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -2988,7 +2922,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -3076,7 +3009,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -3122,9 +3054,8 @@ func TestTransactionManager(t *testing.T) { string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), }, Directory: testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, - "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, }, Repositories: RepositoryStates{ relativePath: { @@ -3154,7 +3085,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -3200,7 +3130,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -3286,7 +3215,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -3366,7 +3294,6 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), @@ -3449,8 +3376,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3475,8 +3403,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3501,8 +3430,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3529,8 +3459,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, Commit{ TransactionID: 2, @@ -3690,16 +3621,8 @@ func TestTransactionManager(t *testing.T) { string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(), }, Directory: testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/hooks/1/pre-receive": { - Mode: umask.Mask(fs.ModePerm), - Content: []byte("hook content"), - }, - "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, }, Repositories: RepositoryStates{}, }, @@ -3714,8 +3637,9 @@ func TestTransactionManager(t *testing.T) { Begin{ TransactionID: 2, }, - AsyncDeletion{ - TransactionID: 1, + Commit{ + TransactionID: 1, + DeleteRepository: true, }, // The concurrent transaction should be able to read the // repository despite the committed deletion. @@ -3919,6 +3843,217 @@ func TestTransactionManager(t *testing.T) { }, }, }, + { + desc: "transactions are snapshot isolated from concurrent updates", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 2, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/new-head", + }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + // This transaction was started before the commit, so it should see the original state. + RepositoryAssertion{ + TransactionID: 1, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/main", + }, + }, + }, + // This transaction was started after the commit, so it should see the new state. + RepositoryAssertion{ + TransactionID: 3, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/new-head", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + CustomHooks: testhelper.DirectoryState{ + "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/pre-receive": { + Mode: umask.Mask(fs.ModePerm), + Content: []byte("hook content"), + }, + "/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + }, + }, + }, + }, + Rollback{ + TransactionID: 1, + }, + Rollback{ + TransactionID: 3, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), + "/wal/1/objects.rev": reverseIndexFileDirectoryEntry(setup.Config), + "/wal/1/objects.pack": packFileDirectoryEntry( + setup.Config, + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/new-head", + References: []git.Reference{{Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}}, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + CustomHooks: testhelper.DirectoryState{ + "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/pre-receive": { + Mode: umask.Mask(fs.ModePerm), + Content: []byte("hook content"), + }, + "/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + }, + }, + }, + }, + }, + { + desc: "transactions are snapshot isolated from concurrent deletions", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/new-head", + }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + DeleteRepository: true, + }, + // This transaction was started before the deletion, so it should see the old state regardless + // of the repository being deleted. + RepositoryAssertion{ + TransactionID: 3, + Repositories: RepositoryStates{ + relativePath: { + DefaultBranch: "refs/heads/new-head", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + CustomHooks: testhelper.DirectoryState{ + "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/pre-receive": { + Mode: umask.Mask(fs.ModePerm), + Content: []byte("hook content"), + }, + "/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + }, + }, + }, + }, + Rollback{ + TransactionID: 3, + }, + Begin{ + TransactionID: 4, + ExpectedSnapshot: Snapshot{ + ReadIndex: 2, + }, + }, + RepositoryAssertion{ + TransactionID: 4, + Repositories: RepositoryStates{}, + }, + Rollback{ + TransactionID: 4, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config), + "/wal/1/objects.rev": reverseIndexFileDirectoryEntry(setup.Config), + "/wal/1/objects.pack": packFileDirectoryEntry( + setup.Config, + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Repositories: RepositoryStates{}, + }, + }, } type invalidReferenceTestCase struct { @@ -4017,13 +4152,14 @@ func TestTransactionManager(t *testing.T) { require.NoError(t, err) defer testhelper.MustClose(t, database) - stateDir := filepath.Join(setup.Config.Storages[0].Path, "state") + txManager := transaction.NewManager(setup.Config, backchannel.NewRegistry()) + housekeepingManager := housekeeping.NewManager(setup.Config.Prometheus, txManager) - stagingDir := t.TempDir() storagePath := setup.Config.Storages[0].Path + stateDir := filepath.Join(storagePath, "state") - txManager := transaction.NewManager(setup.Config, backchannel.NewRegistry()) - housekeepingManager := housekeeping.NewManager(setup.Config.Prometheus, txManager) + stagingDir := filepath.Join(storagePath, "staging") + require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir)) var ( // managerRunning tracks whether the manager is running or closed. @@ -4122,13 +4258,7 @@ func TestTransactionManager(t *testing.T) { transaction, err := transactionManager.Begin(beginCtx, step.TransactionOptions) require.Equal(t, step.ExpectedError, err) if err == nil { - expectedSnapshot := step.ExpectedSnapshot - expectedSnapshot.CustomHookPath = filepath.Join(repoPath, repoutil.CustomHooksDir) - if expectedSnapshot.CustomHookIndex > 0 { - expectedSnapshot.CustomHookPath = customHookPathForLogIndex(repoPath, expectedSnapshot.CustomHookIndex) - } - - require.Equal(t, expectedSnapshot, transaction.Snapshot()) + require.Equal(t, step.ExpectedSnapshot, transaction.Snapshot()) } if step.TransactionOptions.ReadOnly { @@ -4175,6 +4305,7 @@ func TestTransactionManager(t *testing.T) { rewrittenRepo := setup.RepositoryFactory.Build( transaction.RewriteRepository(repo.Repository.(*gitalypb.Repository)), ) + for _, pack := range step.QuarantinedPacks { require.NoError(t, rewrittenRepo.UnpackObjects(ctx, bytes.NewReader(pack))) } @@ -4204,26 +4335,6 @@ func TestTransactionManager(t *testing.T) { default: t.Fatalf("unexpected error type: %T", expectedErr) } - case AsyncDeletion: - require.Contains(t, openTransactions, step.TransactionID, "test error: transaction committed before beginning it") - - transaction := openTransactions[step.TransactionID] - transaction.DeleteRepository() - - commitErr := make(chan error) - go func() { - commitErr <- transaction.Commit(ctx) - }() - defer func() { - require.NoError(t, <-commitErr, "committing async deletion failed") - }() - - // The transactions generally don't block each other due to MVCC. Repository deletions are not yet managed via MVCC - // and thus block until all other transactions with an older snapshot are finished. In order to test transactions with - // concurrent repository deletions, we have to commit the deletions asynchronously. We peek here at the internals to - // determine that the deletion has actually been admitted, and is waiting for application to ensure the commit order is always - // as expected by the test. - <-transaction.admitted case RecordInitialReferenceValues: require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it") @@ -4252,11 +4363,15 @@ func TestTransactionManager(t *testing.T) { transaction := openTransactions[step.TransactionID] RequireRepositories(t, ctx, setup.Config, - storagePath, + // Assert the contents of the transaction's snapshot. + filepath.Join(setup.Config.Storages[0].Path, transaction.snapshotBaseRelativePath), // Rewrite all of the repositories to point to their snapshots. func(relativePath string) *localrepo.Repo { return setup.RepositoryFactory.Build( - transaction.RewriteRepository(repo.Repository.(*gitalypb.Repository)), + transaction.RewriteRepository(&gitalypb.Repository{ + StorageName: setup.Config.Storages[0].Name, + RelativePath: relativePath, + }), ) }, step.Repositories) default: @@ -4303,9 +4418,8 @@ func TestTransactionManager(t *testing.T) { // Set the base state as the default so we don't have to repeat it in every test case but it // gets asserted. expectedDirectory = testhelper.DirectoryState{ - "/": {Mode: fs.ModeDir | perm.PrivateDir}, - "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, - "/hooks": {Mode: fs.ModeDir | perm.PrivateDir}, + "/": {Mode: fs.ModeDir | perm.PrivateDir}, + "/wal": {Mode: fs.ModeDir | perm.PrivateDir}, } } @@ -4478,7 +4592,15 @@ func BenchmarkTransactionManager(b *testing.B) { commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents()) commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1)) - manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), b.TempDir(), cmdFactory, housekeepingManager, repositoryFactory) + storagePath := cfg.Storages[0].Path + + stateDir := filepath.Join(storagePath, "state", strconv.Itoa(i)) + require.NoError(b, os.MkdirAll(stateDir, perm.PrivateDir)) + + stagingDir := filepath.Join(storagePath, "staging", strconv.Itoa(i)) + require.NoError(b, os.MkdirAll(stagingDir, perm.PrivateDir)) + + manager := NewTransactionManager(database, storagePath, repo.RelativePath, stateDir, stagingDir, cmdFactory, housekeepingManager, repositoryFactory) managers = append(managers, manager) |