Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-10-03 07:50:27 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-10-03 07:50:27 +0300
commit4fc1879574e1d0a4cc4cea3c6a654c011b65c544 (patch)
tree5ee05be909523f09cc4077961390bd20a05305d9
parentf9e6780bdf315bf44fb1d9acafb76abe0ae66b73 (diff)
parent63cedd75726a893cb652f566f24e6430e9e5858c (diff)
Merge branch 'smh-snapshot-isolate-txn' into 'master'
Snapshot isolate transactions See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6388 Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: karthik nayak <knayak@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: karthik nayak <knayak@gitlab.com> Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--internal/git/housekeeping/worktrees.go9
-rw-r--r--internal/git/housekeeping/worktrees_test.go8
-rw-r--r--internal/gitaly/service/operations/apply_patch.go7
-rw-r--r--internal/gitaly/service/repository/create_bundle_from_ref_list_test.go7
-rw-r--r--internal/gitaly/service/repository/create_bundle_test.go7
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager_test.go2
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go477
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager_test.go424
8 files changed, 513 insertions, 428 deletions
diff --git a/internal/git/housekeeping/worktrees.go b/internal/git/housekeeping/worktrees.go
index 9d2c20aec..682fef2b9 100644
--- a/internal/git/housekeeping/worktrees.go
+++ b/internal/git/housekeeping/worktrees.go
@@ -18,7 +18,10 @@ import (
)
const (
- worktreePrefix = "gitlab-worktree"
+ // WorktreesPrefix is a subdirectory in the repository where Gitaly creates worktrees.
+ WorktreesPrefix = "worktrees"
+ // GitlabWorktreePrefix is a subdirectory in the repository where Gitaly creates worktrees.
+ GitlabWorktreePrefix = "gitlab-worktree"
)
// CleanupWorktrees cleans up stale and disconnected worktrees for the given repository.
@@ -45,7 +48,7 @@ func cleanStaleWorktrees(ctx context.Context, repo *localrepo.Repo, threshold ti
return err
}
- worktreePath := filepath.Join(repoPath, worktreePrefix)
+ worktreePath := filepath.Join(repoPath, GitlabWorktreePrefix)
dirInfo, err := os.Stat(worktreePath)
if err != nil {
@@ -135,7 +138,7 @@ func cleanDisconnectedWorktrees(ctx context.Context, repo *localrepo.Repo) error
// determining if there could possibly be any work to be done by git-worktree(1). We do so
// by reading the directory in which worktrees are stored, and if it's empty then we know
// that there aren't any worktrees in the first place.
- worktreeEntries, err := os.ReadDir(filepath.Join(repoPath, "worktrees"))
+ worktreeEntries, err := os.ReadDir(filepath.Join(repoPath, WorktreesPrefix))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
diff --git a/internal/git/housekeeping/worktrees_test.go b/internal/git/housekeeping/worktrees_test.go
index 6c51f2e2d..d09b0c2b9 100644
--- a/internal/git/housekeeping/worktrees_test.go
+++ b/internal/git/housekeeping/worktrees_test.go
@@ -63,14 +63,14 @@ func TestRemoveWorktree(t *testing.T) {
gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch(git.DefaultBranch))
repo := localrepo.NewTestRepo(t, cfg, repoProto)
- existingWorktreePath := filepath.Join(repoPath, worktreePrefix, "existing")
+ existingWorktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, "existing")
gittest.AddWorktree(t, cfg, repoPath, existingWorktreePath)
- disconnectedWorktreePath := filepath.Join(repoPath, worktreePrefix, "disconnected")
+ disconnectedWorktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, "disconnected")
gittest.AddWorktree(t, cfg, repoPath, disconnectedWorktreePath)
require.NoError(t, os.RemoveAll(disconnectedWorktreePath))
- orphanedWorktreePath := filepath.Join(repoPath, worktreePrefix, "orphaned")
+ orphanedWorktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, "orphaned")
require.NoError(t, os.MkdirAll(orphanedWorktreePath, perm.PublicDir))
for _, tc := range []struct {
@@ -100,7 +100,7 @@ func TestRemoveWorktree(t *testing.T) {
t.Run(tc.worktree, func(t *testing.T) {
ctx := testhelper.Context(t)
- worktreePath := filepath.Join(repoPath, worktreePrefix, tc.worktree)
+ worktreePath := filepath.Join(repoPath, GitlabWorktreePrefix, tc.worktree)
err := removeWorktree(ctx, repo, tc.worktree)
if tc.errorIs == nil {
diff --git a/internal/gitaly/service/operations/apply_patch.go b/internal/gitaly/service/operations/apply_patch.go
index 366109b83..5381d90d0 100644
--- a/internal/gitaly/service/operations/apply_patch.go
+++ b/internal/gitaly/service/operations/apply_patch.go
@@ -10,6 +10,7 @@ import (
"time"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
@@ -19,10 +20,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/streamio"
)
-const (
- gitlabWorktreesSubDir = "gitlab-worktree"
-)
-
var errNoDefaultBranch = errors.New("no default branch")
type gitError struct {
@@ -277,5 +274,5 @@ func (s *Server) removeWorktree(ctx context.Context, repo *gitalypb.Repository,
func newWorktreePath(repoPath, prefix string) string {
chars := []byte("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
rand.Shuffle(len(chars), func(i, j int) { chars[i], chars[j] = chars[j], chars[i] })
- return filepath.Join(repoPath, gitlabWorktreesSubDir, prefix+string(chars[:32]))
+ return filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix, prefix+string(chars[:32]))
}
diff --git a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go
index 99fd42ea5..00fa22e08 100644
--- a/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go
+++ b/internal/gitaly/service/repository/create_bundle_from_ref_list_test.go
@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
@@ -32,10 +33,10 @@ func TestCreateBundleFromRefList_success(t *testing.T) {
masterOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithMessage("master"), gittest.WithBranch("master"))
sha := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch"))
- require.NoError(t, os.MkdirAll(filepath.Join(repoPath, "gitlab-worktree"), perm.SharedDir))
+ require.NoError(t, os.MkdirAll(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix), perm.SharedDir))
- gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", "gitlab-worktree/worktree1", sha.String())
- require.NoError(t, os.Chtimes(filepath.Join(repoPath, "gitlab-worktree", "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour)))
+ gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", filepath.Join(housekeeping.GitlabWorktreePrefix, "worktree1"), sha.String())
+ require.NoError(t, os.Chtimes(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix, "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour)))
gittest.Exec(t, cfg, "-C", repoPath, "branch", "-D", "branch")
require.NoError(t, os.Remove(filepath.Join(repoPath, "objects", sha.String()[0:2], sha.String()[2:])))
diff --git a/internal/gitaly/service/repository/create_bundle_test.go b/internal/gitaly/service/repository/create_bundle_test.go
index 97e5d3cff..aff86c5c2 100644
--- a/internal/gitaly/service/repository/create_bundle_test.go
+++ b/internal/gitaly/service/repository/create_bundle_test.go
@@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
@@ -30,9 +31,9 @@ func TestCreateBundle_successful(t *testing.T) {
// Create a work tree with a HEAD pointing to a commit that is missing. CreateBundle should
// clean this up before creating the bundle.
worktreeCommit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithMessage("worktree"), gittest.WithBranch("branch"))
- require.NoError(t, os.MkdirAll(filepath.Join(repoPath, "gitlab-worktree"), perm.SharedDir))
- gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", "gitlab-worktree/worktree1", worktreeCommit.String())
- require.NoError(t, os.Chtimes(filepath.Join(repoPath, "gitlab-worktree", "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour)))
+ require.NoError(t, os.MkdirAll(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix), perm.SharedDir))
+ gittest.Exec(t, cfg, "-C", repoPath, "worktree", "add", filepath.Join(housekeeping.GitlabWorktreePrefix, "worktree1"), worktreeCommit.String())
+ require.NoError(t, os.Chtimes(filepath.Join(repoPath, housekeeping.GitlabWorktreePrefix, "worktree1"), time.Now().Add(-7*time.Hour), time.Now().Add(-7*time.Hour)))
gittest.Exec(t, cfg, "-C", repoPath, "branch", "-D", "branch")
require.NoError(t, os.Remove(filepath.Join(repoPath, "objects", worktreeCommit.String()[0:2], worktreeCommit.String()[2:])))
diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go
index 8ec24b4a9..59f2b1b65 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go
@@ -386,7 +386,7 @@ func TestPartitionManager(t *testing.T) {
// Fake a preexisting apply notification. This ensures that we would
// block indefinitely waiting for the notifcation and thus allows us to
// assert that we can indeed cancel this via the context.
- txMgr.applyNotifications[0] = make(chan struct{})
+ txMgr.snapshotLocks[0] = &snapshotLock{}
return txMgr
},
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 5de67ab82..b4362a44c 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -2,7 +2,6 @@ package storagemgr
import (
"bytes"
- "container/list"
"context"
"encoding/binary"
"errors"
@@ -119,11 +118,6 @@ type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate
type Snapshot struct {
// ReadIndex is the index of the log entry this Transaction is reading the data at.
ReadIndex LogIndex
- // CustomHookIndex is index of the custom hooks on the disk that are included in this Transactions's
- // snapshot and were the latest on the read index.
- CustomHookIndex LogIndex
- // CustomHookPath is an absolute filesystem path to the custom hooks in this snapshot.
- CustomHookPath string
}
type transactionState int
@@ -149,12 +143,12 @@ type Transaction struct {
// result is where the outcome of the transaction is sent ot by TransactionManager once it
// has been determined.
result chan error
- // admitted is closed when the transaction was admitted for processing in the TransactionManager.
+ // admitted is set when the transaction was admitted for processing in the TransactionManager.
// Transaction queues in admissionQueue to be committed, and is considered admitted once it has
// been dequeued by TransactionManager.Run(). Once the transaction is admitted, its ownership moves
// from the client goroutine to the TransactionManager.Run() goroutine, and the client goroutine must
// not do any modifications to the state of the transcation anymore to avoid races.
- admitted chan struct{}
+ admitted bool
// finish cleans up the transaction releasing the resources associated with it. It must be called
// once the transaction is done with.
finish func() error
@@ -168,12 +162,15 @@ type Transaction struct {
// quarantineDirectory is the directory within the stagingDirectory where the new objects of the
// transaction are quarantined.
quarantineDirectory string
+ // snapshotBaseRelativePath is the relative path of the snapshot directory in the storage.
+ // It's used to rewrite the repositories to point to their snapshots.
+ snapshotBaseRelativePath string
// packPrefix contains the prefix (`pack-<digest>`) of the transaction's pack if the transaction
// had objects to log.
packPrefix string
- // stagingRepository is a repository that is used to stage the transaction. If there are quarantined
- // objects, it has the quarantine applied so the objects are available for verification and packing.
- stagingRepository *localrepo.Repo
+ // snapshotRepository is a snapshot of the target repository with a possible quarantine applied
+ // if this is a read-write transaction.
+ snapshotRepository *localrepo.Repo
// Snapshot contains the details of the Transaction's read snapshot.
snapshot Snapshot
@@ -216,39 +213,18 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption
txn := &Transaction{
readOnly: opts.ReadOnly,
commit: mgr.commit,
- snapshot: Snapshot{
- ReadIndex: mgr.appendedLogIndex,
- CustomHookIndex: mgr.customHookIndex,
- CustomHookPath: customHookPathForLogIndex(mgr.repositoryPath, mgr.customHookIndex),
- },
- admitted: make(chan struct{}),
+ snapshot: Snapshot{ReadIndex: mgr.appendedLogIndex},
finished: make(chan struct{}),
}
- // If there are no custom hooks stored through the WAL yet, then default to the custom hooks
- // that may already exist in the repository for backwards compatibility.
- if txn.snapshot.CustomHookIndex == 0 {
- txn.snapshot.CustomHookPath = filepath.Join(mgr.repositoryPath, repoutil.CustomHooksDir)
- }
-
- openTransactionElement := mgr.openTransactions.PushBack(txn)
-
- readReady := mgr.applyNotifications[txn.snapshot.ReadIndex]
+ mgr.snapshotLocks[txn.snapshot.ReadIndex].activeSnapshotters.Add(1)
+ defer mgr.snapshotLocks[txn.snapshot.ReadIndex].activeSnapshotters.Done()
+ readReady := mgr.snapshotLocks[txn.snapshot.ReadIndex].applied
mgr.mutex.Unlock()
- if readReady == nil {
- // The snapshot log entry is already applied if there is no notification channel for it.
- // If so, the transaction is ready to begin immediately.
- readReady = make(chan struct{})
- close(readReady)
- }
txn.finish = func() error {
defer close(txn.finished)
- mgr.mutex.Lock()
- mgr.openTransactions.Remove(openTransactionElement)
- mgr.mutex.Unlock()
-
if txn.stagingDirectory != "" {
if err := os.RemoveAll(txn.stagingDirectory); err != nil {
return fmt.Errorf("remove staging directory: %w", err)
@@ -278,14 +254,27 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption
return nil, fmt.Errorf("mkdir temp: %w", err)
}
- txn.stagingRepository = mgr.repositoryFactory.Build(mgr.relativePath)
+ if txn.snapshotBaseRelativePath, err = filepath.Rel(
+ mgr.storagePath,
+ filepath.Join(txn.stagingDirectory, "snapshot"),
+ ); err != nil {
+ return nil, fmt.Errorf("snapshot root relative path: %w", err)
+ }
+
+ if err := mgr.createRepositorySnapshot(ctx,
+ filepath.Join(mgr.storagePath, txn.snapshotRelativePath(mgr.relativePath)),
+ ); err != nil {
+ return nil, fmt.Errorf("create snapshot: %w", err)
+ }
+
+ txn.snapshotRepository = mgr.repositoryFactory.Build(txn.snapshotRelativePath(mgr.relativePath))
if !txn.readOnly {
txn.quarantineDirectory = filepath.Join(txn.stagingDirectory, "quarantine")
if err := os.MkdirAll(filepath.Join(txn.quarantineDirectory, "pack"), perm.PrivateDir); err != nil {
return nil, fmt.Errorf("create quarantine directory: %w", err)
}
- txn.stagingRepository, err = txn.stagingRepository.Quarantine(txn.quarantineDirectory)
+ txn.snapshotRepository, err = txn.snapshotRepository.Quarantine(txn.quarantineDirectory)
if err != nil {
return nil, fmt.Errorf("quarantine: %w", err)
}
@@ -295,16 +284,99 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption
}
}
-// RewriteRepository returns a copy of the repository that has been set up to correctly access the
-// transaction's repository.
+// snapshotRelativePath returns a rewritten relative path that points to the passed in relative path's
+// location in the snapshot.
+func (txn *Transaction) snapshotRelativePath(relativePath string) string {
+ return filepath.Join(txn.snapshotBaseRelativePath, relativePath)
+}
+
+// RewriteRepository returns a copy of the repository that has been set up to correctly access
+// the repository in the transaction's snapshot.
func (txn *Transaction) RewriteRepository(repo *gitalypb.Repository) *gitalypb.Repository {
rewritten := proto.Clone(repo).(*gitalypb.Repository)
- rewritten.RelativePath = txn.stagingRepository.GetRelativePath()
- rewritten.GitObjectDirectory = txn.stagingRepository.GetGitObjectDirectory()
- rewritten.GitAlternateObjectDirectories = txn.stagingRepository.GetGitAlternateObjectDirectories()
+ rewritten.RelativePath = txn.snapshotRelativePath(repo.RelativePath)
+ rewritten.GitObjectDirectory = txn.snapshotRepository.GetGitObjectDirectory()
+ rewritten.GitAlternateObjectDirectories = txn.snapshotRepository.GetGitAlternateObjectDirectories()
return rewritten
}
+// createRepositorySnapshot snapshots the repository's current state at snapshotPath. This is done by
+// recreating the repository's directory structure and hard linking the repository's files in their
+// correct locations there. This effectively does a copy-free clone of the repository. Since the files
+// are shared between the snapshot and the repository, they must not be modified. Git doesn't modify
+// existing files but writes new ones so this property is upheld.
+func (mgr *TransactionManager) createRepositorySnapshot(ctx context.Context, snapshotPath string) error {
+ // This creates the parent directory hierarchy regardless of whether the repository exists or not. It also
+ // doesn't consider the permissions in the storage. While not 100% correct, we have no logic that cares about
+ // the storage hierarchy above repositories.
+ //
+ // The repository's directory itself is not yet created as whether it should be created depends on whether the
+ // repository exists or not.
+ if err := os.MkdirAll(filepath.Dir(snapshotPath), perm.PrivateDir); err != nil {
+ return fmt.Errorf("create parent directory hierarchy: %w", err)
+ }
+
+ mgr.stateLock.RLock()
+ defer mgr.stateLock.RUnlock()
+ if err := createDirectorySnapshot(ctx, mgr.repositoryPath, snapshotPath, map[string]struct{}{
+ // Don't include worktrees in the snapshot. All of the worktrees in the repository should be leftover
+ // state from before transaction management was introduced as the transactions would create their
+ // worktrees in the snapshot.
+ housekeeping.WorktreesPrefix: {},
+ housekeeping.GitlabWorktreePrefix: {},
+ }); err != nil {
+ return fmt.Errorf("create directory snapshot: %w", err)
+ }
+
+ return nil
+}
+
+// createDirectorySnapshot recursively recreates the directory structure from originalDirectory into
+// snapshotDirectory and hard links files into the same locations in snapshotDirectory.
+//
+// skipRelativePaths can be provided to skip certain entries from being included in the snapshot.
+func createDirectorySnapshot(ctx context.Context, originalDirectory, snapshotDirectory string, skipRelativePaths map[string]struct{}) error {
+ if err := filepath.Walk(originalDirectory, func(oldPath string, info fs.FileInfo, err error) error {
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) && oldPath == originalDirectory {
+ // The directory being snapshotted does not exist. This is fine as the transaction
+ // may be about to create it.
+ return nil
+ }
+
+ return err
+ }
+
+ relativePath, err := filepath.Rel(originalDirectory, oldPath)
+ if err != nil {
+ return fmt.Errorf("rel: %w", err)
+ }
+
+ if _, ok := skipRelativePaths[relativePath]; ok {
+ return fs.SkipDir
+ }
+
+ newPath := filepath.Join(snapshotDirectory, relativePath)
+ if info.IsDir() {
+ if err := os.Mkdir(newPath, info.Mode().Perm()); err != nil {
+ return fmt.Errorf("create dir: %w", err)
+ }
+ } else if info.Mode().IsRegular() {
+ if err := os.Link(oldPath, newPath); err != nil {
+ return fmt.Errorf("link file: %w", err)
+ }
+ } else {
+ return fmt.Errorf("unsupported file mode: %q", info.Mode())
+ }
+
+ return nil
+ }); err != nil {
+ return fmt.Errorf("walk: %w", err)
+ }
+
+ return nil
+}
+
func (txn *Transaction) updateState(newState transactionState) error {
switch txn.state {
case transactionStateOpen:
@@ -377,12 +449,11 @@ func (txn *Transaction) Rollback() error {
// the Transaction is being processed by TransactionManager. The clean up responsibility moves there as well
// to avoid races.
func (txn *Transaction) finishUnadmitted() error {
- select {
- case <-txn.admitted:
+ if txn.admitted {
return nil
- default:
- return txn.finish()
}
+
+ return txn.finish()
}
// Snapshot returns the details of the Transaction's read snapshot.
@@ -419,7 +490,7 @@ func (txn *Transaction) RecordInitialReferenceValues(ctx context.Context, initia
continue
}
- objectHash, err := txn.stagingRepository.ObjectHash(ctx)
+ objectHash, err := txn.snapshotRepository.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
@@ -427,7 +498,7 @@ func (txn *Transaction) RecordInitialReferenceValues(ctx context.Context, initia
if objectHash.IsZeroOID(oid) {
// If this is a zero OID, resolve the value to see if this is a force update or the
// reference doesn't exist.
- if current, err := txn.stagingRepository.ResolveRevision(ctx, reference.Revision()); err != nil {
+ if current, err := txn.snapshotRepository.ResolveRevision(ctx, reference.Revision()); err != nil {
if !errors.Is(err, git.ErrReferenceNotFound) {
return fmt.Errorf("resolve revision: %w", err)
}
@@ -507,6 +578,20 @@ func (txn *Transaction) walFilesPath() string {
return filepath.Join(txn.stagingDirectory, "wal-files")
}
+// snapshotLock contains state used to synchronize snapshotters and the log application with each other.
+// Snapshotters wait on the applied channel until all of the committed writes in the read snapshot have
+// been applied on the repository. The log application waits until all activeSnapshotters have managed to
+// snapshot their state prior to applying the next log entry to the repository.
+type snapshotLock struct {
+ // applied is closed when the transaction the snapshotters are waiting for has been applied to the
+ // repository and is ready for reading.
+ applied chan struct{}
+ // activeSnapshotters tracks snapshotters who are either taking a snapshot or waiting for the
+ // log entry to be applied. Log application waits for active snapshotters to finish before applying
+ // the next entry.
+ activeSnapshotters sync.WaitGroup
+}
+
// TransactionManager is responsible for transaction management of a single repository. Each repository has
// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from
// the admissionQueue. Each admitted write is processed in three steps:
@@ -576,6 +661,9 @@ type TransactionManager struct {
repository *localrepo.Repo
// repositoryPath is the path to the repository this TransactionManager is acting on.
repositoryPath string
+ // storagePath is an absolute path to the root of the storage this TransactionManager
+ // is operating in.
+ storagePath string
// relativePath is the repository's relative path inside the storage.
relativePath string
// db is the handle to the key-value store used for storing the write-ahead log related state.
@@ -583,9 +671,6 @@ type TransactionManager struct {
// admissionQueue is where the incoming writes are waiting to be admitted to the transaction
// manager.
admissionQueue chan *Transaction
- // openTransactions contains all transactions that have been begun but not yet committed or rolled back.
- // The transactions are ordered from the oldest to the newest.
- openTransactions *list.List
// initialized is closed when the manager has been initialized. It's used to block new transactions
// from beginning prior to the manager having initialized its runtime state on start up.
@@ -593,18 +678,20 @@ type TransactionManager struct {
// initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't,
// transactions will fail to begin.
initializationSuccessful bool
- // mutex guards access to applyNotifications and appendedLogIndex. These fields are accessed by both
+ // mutex guards access to snapshotLocks and appendedLogIndex. These fields are accessed by both
// Run and Begin which are ran in different goroutines.
mutex sync.Mutex
- // applyNotifications stores channels that are closed when a log entry is applied. These
- // are used to block transactions from beginning before their snapshot is ready.
- applyNotifications map[LogIndex]chan struct{}
+
+ // stateLock is used to synchronize snapshotting with reference verification as the verification
+ // process targets the main repository and creates locks in it.
+ stateLock sync.RWMutex
+ // snapshotLocks contains state used for synchronizing snapshotters with the log application.
+ snapshotLocks map[LogIndex]*snapshotLock
+
// appendedLogIndex holds the index of the last log entry appended to the log.
appendedLogIndex LogIndex
// appliedLogIndex holds the index of the last log entry applied to the repository
appliedLogIndex LogIndex
- // customHookIndex stores the log index of the latest committed custom custom hooks in the repository.
- customHookIndex LogIndex
// housekeepingManager access to the housekeeping.Manager.
housekeepingManager housekeeping.Manager
@@ -634,13 +721,13 @@ func NewTransactionManager(
commandFactory: cmdFactory,
repositoryFactory: repositoryFactory,
repository: repositoryFactory.Build(relativePath),
+ storagePath: storagePath,
repositoryPath: filepath.Join(storagePath, relativePath),
relativePath: relativePath,
db: newDatabaseAdapter(db),
admissionQueue: make(chan *Transaction),
- openTransactions: list.New(),
initialized: make(chan struct{}),
- applyNotifications: make(map[LogIndex]chan struct{}),
+ snapshotLocks: make(map[LogIndex]*snapshotLock),
stateDirectory: stateDir,
stagingDirectory: stagingDir,
housekeepingManager: housekeepingManager,
@@ -666,7 +753,7 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact
select {
case mgr.admissionQueue <- transaction:
- close(transaction.admitted)
+ transaction.admitted = true
select {
case err := <-transaction.result:
@@ -751,7 +838,18 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra
return nil
}
- objectHash, err := transaction.stagingRepository.ObjectHash(ctx)
+ // We should actually be figuring out the new objects to pack against the transaction's snapshot
+ // repository as the objects and references in the actual repository may change while we're doing
+ // this. We don't yet have a way to figure out which objects the new quarantined objects depend on
+ // in the main object database of the snapshot.
+ //
+ // This is pending https://gitlab.com/groups/gitlab-org/-/epics/11242.
+ quarantinedRepo, err := mgr.repository.Quarantine(transaction.quarantineDirectory)
+ if err != nil {
+ return fmt.Errorf("quarantine: %w", err)
+ }
+
+ objectHash, err := quarantinedRepo.ObjectHash(ctx)
if err != nil {
return fmt.Errorf("object hash: %w", err)
}
@@ -781,7 +879,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra
group.Go(func() (returnedErr error) {
defer func() { objectsWriter.CloseWithError(returnedErr) }()
- if err := transaction.stagingRepository.WalkUnreachableObjects(ctx,
+ if err := quarantinedRepo.WalkUnreachableObjects(ctx,
strings.NewReader(strings.Join(heads, "\n")),
objectsWriter,
); err != nil {
@@ -798,7 +896,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra
packWriter.CloseWithError(returnedErr)
}()
- if err := transaction.stagingRepository.PackObjects(ctx, objectsReader, packWriter); err != nil {
+ if err := quarantinedRepo.PackObjects(ctx, objectsReader, packWriter); err != nil {
return fmt.Errorf("pack objects: %w", err)
}
@@ -815,7 +913,7 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra
// index-pack places the pack, index, and reverse index into the repository's object directory.
// The staging repository is configured with a quarantine so we execute it there.
var stdout, stderr bytes.Buffer
- if err := transaction.stagingRepository.ExecAndWait(ctx, git.Command{
+ if err := quarantinedRepo.ExecAndWait(ctx, git.Command{
Name: "index-pack",
Flags: []git.Option{git.Flag{Name: "--stdin"}, git.Flag{Name: "--rev-index"}},
Args: []string{filepath.Join(transaction.walFilesPath(), "objects.pack")},
@@ -1052,16 +1150,15 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
}
}
- var err error
- mgr.customHookIndex, err = mgr.determineCustomHookIndex(ctx, mgr.appendedLogIndex, mgr.appliedLogIndex)
- if err != nil {
- return fmt.Errorf("determine hook index: %w", err)
- }
+ // Create a snapshot lock for the applied index as it is used for synchronizing
+ // the snapshotters with the log application.
+ mgr.snapshotLocks[mgr.appliedLogIndex] = &snapshotLock{applied: make(chan struct{})}
+ close(mgr.snapshotLocks[mgr.appliedLogIndex].applied)
- // Each unapplied log entry should have a notification channel that gets closed when it is applied.
- // Create these channels here for the log entries.
+ // Each unapplied log entry should have a snapshot lock as they are created in normal
+ // operation when committing a log entry. Recover these entries.
for i := mgr.appliedLogIndex + 1; i <= mgr.appendedLogIndex; i++ {
- mgr.applyNotifications[i] = make(chan struct{})
+ mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})}
}
if err := mgr.removeStaleWALFiles(mgr.ctx, mgr.appendedLogIndex); err != nil {
@@ -1109,57 +1206,10 @@ func (mgr *TransactionManager) determineRepositoryExistence() error {
return nil
}
-// determineCustomHookIndex determines the latest custom hooks in the repository.
-//
-// 1. First we iterate through the unapplied log in reverse order. The first log entry that
-// contains custom hooks must have the latest custom hooks since it is the latest log entry.
-// 2. If we don't find any custom hooks in the log, the latest hooks could have been applied
-// to the repository already and the log entry pruned away. Look at the custom hooks on the
-// disk to see which are the latest.
-// 3. If we found no custom hooks in the log nor in the repository, there are no custom hooks
-// configured.
-func (mgr *TransactionManager) determineCustomHookIndex(ctx context.Context, appendedIndex, appliedIndex LogIndex) (LogIndex, error) {
- if !mgr.repositoryExists {
- // If the repository doesn't exist, then there are no hooks either.
- return 0, nil
- }
-
- for i := appendedIndex; appliedIndex < i; i-- {
- logEntry, err := mgr.readLogEntry(i)
- if err != nil {
- return 0, fmt.Errorf("read log entry: %w", err)
- }
-
- if logEntry.CustomHooksUpdate != nil {
- return i, nil
- }
- }
-
- hookDirs, err := os.ReadDir(filepath.Join(mgr.stateDirectory, "hooks"))
- if err != nil {
- return 0, fmt.Errorf("read hook directories: %w", err)
- }
-
- var hookIndex LogIndex
- for _, dir := range hookDirs {
- rawIndex, err := strconv.ParseUint(dir.Name(), 10, 64)
- if err != nil {
- return 0, fmt.Errorf("parse hook index: %w", err)
- }
-
- if index := LogIndex(rawIndex); hookIndex < index {
- hookIndex = index
- }
- }
-
- return hookIndex, err
-}
-
func (mgr *TransactionManager) createStateDirectory() error {
for _, path := range []string{
mgr.stateDirectory,
filepath.Join(mgr.stateDirectory, "wal"),
- filepath.Join(mgr.stateDirectory, "hooks"),
} {
if err := os.Mkdir(path, perm.PrivateDir); err != nil {
if !errors.Is(err, fs.ErrExist) {
@@ -1276,15 +1326,20 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
return nil, nil
}
+ quarantinedRepo, err := mgr.repository.Quarantine(transaction.quarantineDirectory)
+ if err != nil {
+ return nil, fmt.Errorf("quarantine: %w", err)
+ }
+
var referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate
for referenceName, update := range transaction.referenceUpdates {
if err := git.ValidateReference(string(referenceName)); err != nil {
return nil, InvalidReferenceFormatError{ReferenceName: referenceName}
}
- actualOldTip, err := transaction.stagingRepository.ResolveRevision(ctx, referenceName.Revision())
+ actualOldTip, err := quarantinedRepo.ResolveRevision(ctx, referenceName.Revision())
if errors.Is(err, git.ErrReferenceNotFound) {
- objectHash, err := transaction.stagingRepository.ObjectHash(ctx)
+ objectHash, err := quarantinedRepo.ObjectHash(ctx)
if err != nil {
return nil, fmt.Errorf("object hash: %w", err)
}
@@ -1320,7 +1375,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
) == -1
})
- if err := mgr.verifyReferencesWithGit(ctx, referenceUpdates, transaction.stagingRepository); err != nil {
+ if err := mgr.verifyReferencesWithGit(ctx, referenceUpdates, quarantinedRepo); err != nil {
return nil, fmt.Errorf("verify references with git: %w", err)
}
@@ -1331,8 +1386,17 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
// the updates will go through when they are being applied in the log. This also catches any invalid reference names
// and file/directory conflicts with Git's loose reference storage which can occur with references like
// 'refs/heads/parent' and 'refs/heads/parent/child'.
-func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, stagingRepository *localrepo.Repo) error {
- updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, stagingRepository)
+func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, quarantinedRepo *localrepo.Repo) error {
+ // Prevent snapshots from being taken concurrently as the reference verification
+ // currently creates locks in the repository. We don't need to synchronnize snapshotting
+ // and reference verification once we are no longer verifying the references against
+ // the repository.
+ //
+ // Alternatively this could be removed by ignoring the lock files during snapshot creation.
+ mgr.stateLock.Lock()
+ defer mgr.stateLock.Unlock()
+
+ updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, quarantinedRepo)
if err != nil {
return fmt.Errorf("prepare reference transaction: %w", err)
}
@@ -1451,13 +1515,9 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g
mgr.mutex.Lock()
mgr.appendedLogIndex = nextLogIndex
- if logEntry.CustomHooksUpdate != nil {
- mgr.customHookIndex = nextLogIndex
- }
- mgr.applyNotifications[nextLogIndex] = make(chan struct{})
+ mgr.snapshotLocks[nextLogIndex] = &snapshotLock{applied: make(chan struct{})}
if logEntry.RepositoryDeletion != nil {
mgr.repositoryExists = false
- mgr.customHookIndex = 0
}
mgr.mutex.Unlock()
@@ -1471,6 +1531,15 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return fmt.Errorf("read log entry: %w", err)
}
+ // Ensure all snapshotters have finished snapshotting the previous state before we apply
+ // the new state to the repository. No new snapshotters can arrive at this point. All
+ // new transactions would be waiting for the committed log entry we are about to apply.
+ previousIndex := logIndex - 1
+ mgr.snapshotLocks[previousIndex].activeSnapshotters.Wait()
+ mgr.mutex.Lock()
+ delete(mgr.snapshotLocks, previousIndex)
+ mgr.mutex.Unlock()
+
if logEntry.RepositoryDeletion != nil {
// If the repository is being deleted, just delete it without any other changes given
// they'd all be removed anyway. Reapplying the other changes after a crash would also
@@ -1508,18 +1577,6 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
mgr.appliedLogIndex = logIndex
- // Notify the transactions waiting for this log entry to be applied prior to beginning.
- mgr.mutex.Lock()
- defer mgr.mutex.Unlock()
-
- notificationCh, ok := mgr.applyNotifications[logIndex]
- if !ok {
- // This should never happen and is a programming error if it does.
- return fmt.Errorf("no notification channel for LSN %d", logIndex)
- }
- delete(mgr.applyNotifications, logIndex)
- close(notificationCh)
-
// There is no awaiter for a transaction if the transaction manager is recovering
// transactions from the log after starting up.
if resultChan, ok := mgr.awaitingTransactions[logIndex]; ok {
@@ -1527,6 +1584,10 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
delete(mgr.awaitingTransactions, logIndex)
}
+ // Notify the transactions waiting for this log entry to be applied prior to take their
+ // snapshot.
+ close(mgr.snapshotLocks[logIndex].applied)
+
return nil
}
@@ -1549,61 +1610,7 @@ func (mgr *TransactionManager) applyReferenceUpdates(ctx context.Context, update
}
// applyRepositoryDeletion deletes the repository.
-//
-// Given how the repositories are laid out in the storage, we currently can't support MVCC for them.
-// This is because there is only ever a single instance of a given repository. We have to wait for all
-// of the readers to finish before we can delete the repository as otherwise the readers could fail in
-// unexpected ways and it would be an isolation violation. Repository deletions thus block before all
-// transaction with an older read snapshot are done with the repository.
func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, index LogIndex) error {
- for {
- mgr.mutex.Lock()
- oldestElement := mgr.openTransactions.Front()
- mgr.mutex.Unlock()
- if oldestElement == nil {
- // If there are no open transactions, the deletion can proceed as there are
- // no readers.
- //
- // Any new transaction would have the deletion in their snapshot, and are waiting
- // for it to be applied prior to beginning.
- break
- }
-
- oldestTransaction := oldestElement.Value.(*Transaction)
- if oldestTransaction.snapshot.ReadIndex >= index {
- // If the oldest transaction is reading at this or later log index, it already has the deletion
- // in its snapshot, and is waiting for it to be applied. Proceed with the deletion as there
- // are no readers with the pre-deletion state in the snapshot.
- break
- }
-
- for {
- select {
- case <-oldestTransaction.finished:
- // The oldest transaction finished. Proceed to check the second oldest open transaction.
- case transaction := <-mgr.admissionQueue:
- // The oldest transaction could also be waiting to commit. Since the Run goroutine is
- // blocked here waiting for the transaction to finish, the write would never be admitted
- // for processing, leading to a deadlock. Since the repository was deleted, the only correct
- // outcome for the transaction would be to receive a not found error. Admit the transaction,
- // and finish it with the correct result so we can unblock the deletion.
- transaction.result <- ErrRepositoryNotFound
- if err := transaction.finish(); err != nil {
- return fmt.Errorf("finish transaction: %w", err)
- }
-
- continue
- case <-ctx.Done():
- }
-
- if err := ctx.Err(); err != nil {
- return err
- }
-
- break
- }
- }
-
if err := os.RemoveAll(mgr.repositoryPath); err != nil {
return fmt.Errorf("remove repository: %w", err)
}
@@ -1646,87 +1653,41 @@ func (mgr *TransactionManager) applyPackFile(ctx context.Context, packPrefix str
return nil
}
-// applyCustomHooks applies the custom hooks to the repository from the log entry. The custom hooks are stored
-// at `<repo>/wal/hooks/<log_index>`. The custom hooks are fsynced prior to returning so it is safe to delete
-// the log entry afterwards.
-//
-// The hooks are also extracted at `<repo>/custom_hooks`. This is done for backwards compatibility, as we want
-// the hooks to be present even if the WAL logic is disabled. This ensures we don't lose data if we have to
-// disable the WAL logic after rollout.
+// applyCustomHooks applies the custom hooks to the repository from the log entry. The hooks are extracted at
+// `<repo>/custom_hooks`. The custom hooks are fsynced prior to returning so it is safe to delete the log entry
+// afterwards.
func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex LogIndex, update *gitalypb.LogEntry_CustomHooksUpdate) error {
if update == nil {
return nil
}
- targetDirectory := customHookPathForLogIndex(mgr.stateDirectory, logIndex)
- if err := os.Mkdir(targetDirectory, fs.ModePerm); err != nil {
- // The target directory may exist if we previously tried to extract the
- // custom hooks there. TAR overwrites existing files and the custom hooks
- // files are guaranteed to be the same as this is the same log entry.
- if !errors.Is(err, fs.ErrExist) {
- return fmt.Errorf("create directory: %w", err)
- }
+ destinationDir := filepath.Join(mgr.repositoryPath, repoutil.CustomHooksDir)
+ if err := os.RemoveAll(destinationDir); err != nil {
+ return fmt.Errorf("remove directory: %w", err)
}
- syncer := safe.NewSyncer()
- extractHooks := func(destinationDir string) error {
- if err := repoutil.ExtractHooks(ctx, bytes.NewReader(update.CustomHooksTar), destinationDir, true); err != nil {
- return fmt.Errorf("extract hooks: %w", err)
- }
-
- // TAR doesn't sync the extracted files so do it manually here.
- if err := syncer.SyncRecursive(destinationDir); err != nil {
- return fmt.Errorf("sync hooks: %w", err)
- }
-
- return nil
+ if err := os.Mkdir(destinationDir, perm.PrivateDir); err != nil {
+ return fmt.Errorf("create directory: %w", err)
}
- if err := extractHooks(targetDirectory); err != nil {
+ if err := repoutil.ExtractHooks(ctx, bytes.NewReader(update.CustomHooksTar), destinationDir, true); err != nil {
return fmt.Errorf("extract hooks: %w", err)
}
- // Sync the parent directory as well.
- if err := syncer.SyncParent(targetDirectory); err != nil {
- return fmt.Errorf("sync hook directory: %w", err)
- }
-
- // Extract another copy that we can move to `<repo>/custom_hooks` where the hooks exist without the WAL enabled.
- // We make a second copy as if we disable the WAL, we have to clear all of its state prior to re-enabling it.
- // This would clear the hooks so symbolic linking the first copy is not enough.
- tmpDir, err := os.MkdirTemp(mgr.stagingDirectory, "")
- if err != nil {
- return fmt.Errorf("create temporary directory: %w", err)
- }
-
- if err := extractHooks(tmpDir); err != nil {
- return fmt.Errorf("extract legacy hooks: %w", err)
- }
-
- legacyHooksPath := filepath.Join(mgr.repositoryPath, repoutil.CustomHooksDir)
- // The hooks are lost if we perform this removal but fail to perform the remaining operations and the
- // WAL is disabled before succeeding. This is an existing issue already with SetCustomHooks RPC.
- if err := os.RemoveAll(legacyHooksPath); err != nil {
- return fmt.Errorf("remove existing legacy hooks: %w", err)
- }
-
- if err := os.Rename(tmpDir, legacyHooksPath); err != nil {
- return fmt.Errorf("move legacy hooks in place: %w", err)
+ // TAR doesn't sync the extracted files so do it manually here.
+ syncer := safe.NewSyncer()
+ if err := syncer.SyncRecursive(destinationDir); err != nil {
+ return fmt.Errorf("sync hooks: %w", err)
}
- if err := syncer.SyncParent(legacyHooksPath); err != nil {
- return fmt.Errorf("sync legacy hooks directory entry: %w", err)
+ // Sync the parent directory as well.
+ if err := syncer.SyncParent(destinationDir); err != nil {
+ return fmt.Errorf("sync hook directory: %w", err)
}
return nil
}
-// customHookPathForLogIndex returns the filesystem paths where the custom hooks
-// for the given log index are stored.
-func customHookPathForLogIndex(stateDir string, logIndex LogIndex) string {
- return filepath.Join(stateDir, "hooks", logIndex.String())
-}
-
// deleteLogEntry deletes the log entry at the given index from the log.
func (mgr *TransactionManager) deleteLogEntry(index LogIndex) error {
return mgr.deleteKey(keyLogEntry(mgr.relativePath, index))
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
index 7575861c6..94debf74a 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
@@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"sort"
+ "strconv"
"strings"
"sync"
"testing"
@@ -25,7 +26,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
@@ -343,15 +343,6 @@ func TestTransactionManager(t *testing.T) {
IncludeObjects []git.ObjectID
}
- // AsyncDeletion can be used to commit a repository deletion asynchronously. This is necessary in tests
- // which test concurrent transactions with repository deletions as deletions are blocking.
- type AsyncDeletion struct {
- // TransactionID identifies the transaction to async commit a deletion.
- TransactionID int
- // ExpectedError is the error that is expected to be returned when committing the transaction.
- ExpectedError error
- }
-
// RecordInitialReferenceValues calls RecordInitialReferenceValues on a transaction.
type RecordInitialReferenceValues struct {
// TransactionID identifies the transaction to prepare the reference updates on.
@@ -1722,8 +1713,7 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 2,
ExpectedSnapshot: Snapshot{
- ReadIndex: 1,
- CustomHookIndex: 1,
+ ReadIndex: 1,
},
},
Commit{
@@ -1736,17 +1726,8 @@ func TestTransactionManager(t *testing.T) {
string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(),
},
Directory: testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
- "/hooks/1/pre-receive": {
- Mode: umask.Mask(fs.ModePerm),
- Content: []byte("hook content"),
- },
- "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
- "/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
},
Repositories: RepositoryStates{
relativePath: {
@@ -1806,16 +1787,8 @@ func TestTransactionManager(t *testing.T) {
string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(),
},
Directory: testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
- "/hooks/1/pre-receive": {
- Mode: umask.Mask(fs.ModePerm),
- Content: []byte("hook content"),
- },
- "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
},
Repositories: RepositoryStates{
relativePath: {
@@ -1860,8 +1833,7 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 2,
ExpectedSnapshot: Snapshot{
- ReadIndex: 1,
- CustomHookIndex: 1,
+ ReadIndex: 1,
},
},
Commit{
@@ -1871,8 +1843,7 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 3,
ExpectedSnapshot: Snapshot{
- ReadIndex: 2,
- CustomHookIndex: 2,
+ ReadIndex: 2,
},
},
CloseManager{},
@@ -1880,8 +1851,7 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 4,
ExpectedSnapshot: Snapshot{
- ReadIndex: 2,
- CustomHookIndex: 2,
+ ReadIndex: 2,
},
},
Rollback{
@@ -1893,17 +1863,8 @@ func TestTransactionManager(t *testing.T) {
string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(),
},
Directory: testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
- "/hooks/1/pre-receive": {
- Mode: umask.Mask(fs.ModePerm),
- Content: []byte("hook content"),
- },
- "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
- "/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
},
Repositories: RepositoryStates{
relativePath: {
@@ -2638,16 +2599,13 @@ func TestTransactionManager(t *testing.T) {
CustomHooksTAR: validCustomHooks(t),
},
},
- // Transaction 2 is not isolated from the changes made by transaction 1. It sees the committed
- // changes immediately.
+ // Transaction 2 is isolated from the changes made by transaction 1. It does not see the
+ // committed changes.
RepositoryAssertion{
TransactionID: 2,
Repositories: RepositoryStates{
relativePath: {
DefaultBranch: "refs/heads/main",
- References: []git.Reference{
- {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
- },
Objects: []git.ObjectID{
setup.ObjectHash.EmptyTreeOID,
setup.Commits.First.OID,
@@ -2655,23 +2613,13 @@ func TestTransactionManager(t *testing.T) {
setup.Commits.Third.OID,
setup.Commits.Diverging.OID,
},
- CustomHooks: testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/pre-receive": {
- Mode: umask.Mask(fs.ModePerm),
- Content: []byte("hook content"),
- },
- "/private-dir": {Mode: fs.ModeDir | perm.PrivateDir},
- "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
- },
},
},
},
Begin{
TransactionID: 3,
ExpectedSnapshot: Snapshot{
- ReadIndex: 1,
- CustomHookIndex: 1,
+ ReadIndex: 1,
},
},
// Transaction 3 is should see the new changes as it began after transaction 1 was committed.
@@ -2711,8 +2659,7 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 4,
ExpectedSnapshot: Snapshot{
- ReadIndex: 2,
- CustomHookIndex: 1,
+ ReadIndex: 2,
},
},
Rollback{
@@ -2721,8 +2668,7 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 5,
ExpectedSnapshot: Snapshot{
- ReadIndex: 2,
- CustomHookIndex: 1,
+ ReadIndex: 2,
},
},
Commit{
@@ -2735,8 +2681,7 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 6,
ExpectedSnapshot: Snapshot{
- ReadIndex: 3,
- CustomHookIndex: 3,
+ ReadIndex: 3,
},
},
Rollback{
@@ -2751,17 +2696,8 @@ func TestTransactionManager(t *testing.T) {
string(keyAppliedLogIndex(relativePath)): LogIndex(3).toProto(),
},
Directory: testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
- "/hooks/1/pre-receive": {
- Mode: umask.Mask(fs.ModePerm),
- Content: []byte("hook content"),
- },
- "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
- "/hooks/3": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
},
Repositories: RepositoryStates{
relativePath: {
@@ -2811,7 +2747,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -2914,7 +2849,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -2988,7 +2922,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -3076,7 +3009,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -3122,9 +3054,8 @@ func TestTransactionManager(t *testing.T) {
string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(),
},
Directory: testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
},
Repositories: RepositoryStates{
relativePath: {
@@ -3154,7 +3085,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -3200,7 +3130,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -3286,7 +3215,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -3366,7 +3294,6 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
"/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
@@ -3449,8 +3376,9 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 2,
},
- AsyncDeletion{
- TransactionID: 1,
+ Commit{
+ TransactionID: 1,
+ DeleteRepository: true,
},
Commit{
TransactionID: 2,
@@ -3475,8 +3403,9 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 2,
},
- AsyncDeletion{
- TransactionID: 1,
+ Commit{
+ TransactionID: 1,
+ DeleteRepository: true,
},
Commit{
TransactionID: 2,
@@ -3501,8 +3430,9 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 2,
},
- AsyncDeletion{
- TransactionID: 1,
+ Commit{
+ TransactionID: 1,
+ DeleteRepository: true,
},
Commit{
TransactionID: 2,
@@ -3529,8 +3459,9 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 2,
},
- AsyncDeletion{
- TransactionID: 1,
+ Commit{
+ TransactionID: 1,
+ DeleteRepository: true,
},
Commit{
TransactionID: 2,
@@ -3690,16 +3621,8 @@ func TestTransactionManager(t *testing.T) {
string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(),
},
Directory: testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
- "/hooks/1/pre-receive": {
- Mode: umask.Mask(fs.ModePerm),
- Content: []byte("hook content"),
- },
- "/hooks/1/private-dir": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
},
Repositories: RepositoryStates{},
},
@@ -3714,8 +3637,9 @@ func TestTransactionManager(t *testing.T) {
Begin{
TransactionID: 2,
},
- AsyncDeletion{
- TransactionID: 1,
+ Commit{
+ TransactionID: 1,
+ DeleteRepository: true,
},
// The concurrent transaction should be able to read the
// repository despite the committed deletion.
@@ -3919,6 +3843,217 @@ func TestTransactionManager(t *testing.T) {
},
},
},
+ {
+ desc: "transactions are snapshot isolated from concurrent updates",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Begin{
+ TransactionID: 2,
+ },
+ Commit{
+ TransactionID: 2,
+ DefaultBranchUpdate: &DefaultBranchUpdate{
+ Reference: "refs/heads/new-head",
+ },
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ CustomHooksUpdate: &CustomHooksUpdate{
+ CustomHooksTAR: validCustomHooks(t),
+ },
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ // This transaction was started before the commit, so it should see the original state.
+ RepositoryAssertion{
+ TransactionID: 1,
+ Repositories: RepositoryStates{
+ relativePath: {
+ DefaultBranch: "refs/heads/main",
+ },
+ },
+ },
+ // This transaction was started after the commit, so it should see the new state.
+ RepositoryAssertion{
+ TransactionID: 3,
+ Repositories: RepositoryStates{
+ relativePath: {
+ DefaultBranch: "refs/heads/new-head",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ CustomHooks: testhelper.DirectoryState{
+ "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/pre-receive": {
+ Mode: umask.Mask(fs.ModePerm),
+ Content: []byte("hook content"),
+ },
+ "/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
+ },
+ },
+ },
+ },
+ Rollback{
+ TransactionID: 1,
+ },
+ Rollback{
+ TransactionID: 3,
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
+ "/wal/1/objects.rev": reverseIndexFileDirectoryEntry(setup.Config),
+ "/wal/1/objects.pack": packFileDirectoryEntry(
+ setup.Config,
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Repositories: RepositoryStates{
+ relativePath: {
+ DefaultBranch: "refs/heads/new-head",
+ References: []git.Reference{{Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}},
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ CustomHooks: testhelper.DirectoryState{
+ "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/pre-receive": {
+ Mode: umask.Mask(fs.ModePerm),
+ Content: []byte("hook content"),
+ },
+ "/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "transactions are snapshot isolated from concurrent deletions",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ DefaultBranchUpdate: &DefaultBranchUpdate{
+ Reference: "refs/heads/new-head",
+ },
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ CustomHooksUpdate: &CustomHooksUpdate{
+ CustomHooksTAR: validCustomHooks(t),
+ },
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ DeleteRepository: true,
+ },
+ // This transaction was started before the deletion, so it should see the old state regardless
+ // of the repository being deleted.
+ RepositoryAssertion{
+ TransactionID: 3,
+ Repositories: RepositoryStates{
+ relativePath: {
+ DefaultBranch: "refs/heads/new-head",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ CustomHooks: testhelper.DirectoryState{
+ "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/pre-receive": {
+ Mode: umask.Mask(fs.ModePerm),
+ Content: []byte("hook content"),
+ },
+ "/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
+ },
+ },
+ },
+ },
+ Rollback{
+ TransactionID: 3,
+ },
+ Begin{
+ TransactionID: 4,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 2,
+ },
+ },
+ RepositoryAssertion{
+ TransactionID: 4,
+ Repositories: RepositoryStates{},
+ },
+ Rollback{
+ TransactionID: 4,
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal/1": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal/1/objects.idx": indexFileDirectoryEntry(setup.Config),
+ "/wal/1/objects.rev": reverseIndexFileDirectoryEntry(setup.Config),
+ "/wal/1/objects.pack": packFileDirectoryEntry(
+ setup.Config,
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Repositories: RepositoryStates{},
+ },
+ },
}
type invalidReferenceTestCase struct {
@@ -4017,13 +4152,14 @@ func TestTransactionManager(t *testing.T) {
require.NoError(t, err)
defer testhelper.MustClose(t, database)
- stateDir := filepath.Join(setup.Config.Storages[0].Path, "state")
+ txManager := transaction.NewManager(setup.Config, backchannel.NewRegistry())
+ housekeepingManager := housekeeping.NewManager(setup.Config.Prometheus, txManager)
- stagingDir := t.TempDir()
storagePath := setup.Config.Storages[0].Path
+ stateDir := filepath.Join(storagePath, "state")
- txManager := transaction.NewManager(setup.Config, backchannel.NewRegistry())
- housekeepingManager := housekeeping.NewManager(setup.Config.Prometheus, txManager)
+ stagingDir := filepath.Join(storagePath, "staging")
+ require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir))
var (
// managerRunning tracks whether the manager is running or closed.
@@ -4122,13 +4258,7 @@ func TestTransactionManager(t *testing.T) {
transaction, err := transactionManager.Begin(beginCtx, step.TransactionOptions)
require.Equal(t, step.ExpectedError, err)
if err == nil {
- expectedSnapshot := step.ExpectedSnapshot
- expectedSnapshot.CustomHookPath = filepath.Join(repoPath, repoutil.CustomHooksDir)
- if expectedSnapshot.CustomHookIndex > 0 {
- expectedSnapshot.CustomHookPath = customHookPathForLogIndex(repoPath, expectedSnapshot.CustomHookIndex)
- }
-
- require.Equal(t, expectedSnapshot, transaction.Snapshot())
+ require.Equal(t, step.ExpectedSnapshot, transaction.Snapshot())
}
if step.TransactionOptions.ReadOnly {
@@ -4175,6 +4305,7 @@ func TestTransactionManager(t *testing.T) {
rewrittenRepo := setup.RepositoryFactory.Build(
transaction.RewriteRepository(repo.Repository.(*gitalypb.Repository)),
)
+
for _, pack := range step.QuarantinedPacks {
require.NoError(t, rewrittenRepo.UnpackObjects(ctx, bytes.NewReader(pack)))
}
@@ -4204,26 +4335,6 @@ func TestTransactionManager(t *testing.T) {
default:
t.Fatalf("unexpected error type: %T", expectedErr)
}
- case AsyncDeletion:
- require.Contains(t, openTransactions, step.TransactionID, "test error: transaction committed before beginning it")
-
- transaction := openTransactions[step.TransactionID]
- transaction.DeleteRepository()
-
- commitErr := make(chan error)
- go func() {
- commitErr <- transaction.Commit(ctx)
- }()
- defer func() {
- require.NoError(t, <-commitErr, "committing async deletion failed")
- }()
-
- // The transactions generally don't block each other due to MVCC. Repository deletions are not yet managed via MVCC
- // and thus block until all other transactions with an older snapshot are finished. In order to test transactions with
- // concurrent repository deletions, we have to commit the deletions asynchronously. We peek here at the internals to
- // determine that the deletion has actually been admitted, and is waiting for application to ensure the commit order is always
- // as expected by the test.
- <-transaction.admitted
case RecordInitialReferenceValues:
require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it")
@@ -4252,11 +4363,15 @@ func TestTransactionManager(t *testing.T) {
transaction := openTransactions[step.TransactionID]
RequireRepositories(t, ctx, setup.Config,
- storagePath,
+ // Assert the contents of the transaction's snapshot.
+ filepath.Join(setup.Config.Storages[0].Path, transaction.snapshotBaseRelativePath),
// Rewrite all of the repositories to point to their snapshots.
func(relativePath string) *localrepo.Repo {
return setup.RepositoryFactory.Build(
- transaction.RewriteRepository(repo.Repository.(*gitalypb.Repository)),
+ transaction.RewriteRepository(&gitalypb.Repository{
+ StorageName: setup.Config.Storages[0].Name,
+ RelativePath: relativePath,
+ }),
)
}, step.Repositories)
default:
@@ -4303,9 +4418,8 @@ func TestTransactionManager(t *testing.T) {
// Set the base state as the default so we don't have to repeat it in every test case but it
// gets asserted.
expectedDirectory = testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
- "/hooks": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
}
}
@@ -4478,7 +4592,15 @@ func BenchmarkTransactionManager(b *testing.B) {
commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents())
commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1))
- manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), b.TempDir(), cmdFactory, housekeepingManager, repositoryFactory)
+ storagePath := cfg.Storages[0].Path
+
+ stateDir := filepath.Join(storagePath, "state", strconv.Itoa(i))
+ require.NoError(b, os.MkdirAll(stateDir, perm.PrivateDir))
+
+ stagingDir := filepath.Join(storagePath, "staging", strconv.Itoa(i))
+ require.NoError(b, os.MkdirAll(stagingDir, perm.PrivateDir))
+
+ manager := NewTransactionManager(database, storagePath, repo.RelativePath, stateDir, stagingDir, cmdFactory, housekeepingManager, repositoryFactory)
managers = append(managers, manager)