Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-12-05 06:54:42 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-12-06 08:49:09 +0300
commitfc304279221060c327e8b87465b70ec67c988d0a (patch)
treeb1c9736cfa71cb8a937b1bac871ff503dbf8d16d
parent14d9f1ae93d635a8b5759e915d0e3c2915e7dd7f (diff)
Extract helpers and executor out of TestTransactionManager
Recently, the test TestTransactionManager has been quite long with nearly ~6000 LOCs. All test cases are packed into one big test table. That makes it hard to navigate through the file. The size of the file even breaks GitLab.com's review feature. This commit restructures the test file so that we can split it into smaller ones. The foundational changes consist of the following: - Move common test structs to test helper file. - Generalize and extract test setup. - Extract scenario execution.
-rw-r--r--internal/gitaly/storage/storagemgr/testhelper_test.go603
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager_test.go664
2 files changed, 643 insertions, 624 deletions
diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go
index 8197d4d51..28e8aee9d 100644
--- a/internal/gitaly/storage/storagemgr/testhelper_test.go
+++ b/internal/gitaly/storage/storagemgr/testhelper_test.go
@@ -1,6 +1,7 @@
package storagemgr
import (
+ "bytes"
"context"
"fmt"
"io/fs"
@@ -8,18 +9,26 @@ import (
"path/filepath"
"reflect"
"sort"
+ "sync"
"testing"
"github.com/dgraph-io/badger/v4"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/protobuf/proto"
)
@@ -195,3 +204,597 @@ func RequireDatabase(tb testing.TB, ctx context.Context, database Database, expe
require.Empty(tb, unexpectedKeys, "database contains unexpected keys")
testhelper.ProtoEqual(tb, expectedState, actualState)
}
+
+type testTransactionCommit struct {
+ OID git.ObjectID
+ Pack []byte
+}
+
+type testTransactionCommits struct {
+ First testTransactionCommit
+ Second testTransactionCommit
+ Third testTransactionCommit
+ Diverging testTransactionCommit
+}
+
+type testTransactionSetup struct {
+ PartitionID partitionID
+ RelativePath string
+ RepositoryPath string
+ Repo *localrepo.Repo
+ Config config.Cfg
+ CommandFactory git.CommandFactory
+ RepositoryFactory localrepo.Factory
+ ObjectHash git.ObjectHash
+ NonExistentOID git.ObjectID
+ Commits testTransactionCommits
+}
+
+type testTransactionHooks struct {
+ // BeforeApplyLogEntry is called before a log entry is applied to the repository.
+ BeforeApplyLogEntry hookFunc
+ // BeforeAppendLogEntry is called before a log entry is appended to the log.
+ BeforeAppendLogEntry hookFunc
+ // BeforeDeleteLogEntry is called before a log entry is deleted.
+ BeforeDeleteLogEntry hookFunc
+ // BeforeReadAppliedLSN is invoked before the applied LSN is read.
+ BeforeReadAppliedLSN hookFunc
+ // BeforeStoreAppliedLSN is invoked before the applied LSN is stored.
+ BeforeStoreAppliedLSN hookFunc
+ // WaitForTransactionsWhenClosing waits for a in-flight to finish before returning
+ // from Run.
+ WaitForTransactionsWhenClosing bool
+}
+
+// StartManager starts a TransactionManager.
+type StartManager struct {
+ // Hooks contains the hook functions that are configured on the TransactionManager. These allow
+ // for better synchronization.
+ Hooks testTransactionHooks
+ // ExpectedError is the expected error to be raised from the manager's Run. Panics are converted
+ // to errors and asserted to match this as well.
+ ExpectedError error
+ // ModifyStorage allows modifying the storage prior to the manager starting. This
+ // may be necessary to test some states that can be reached from hard crashes
+ // but not during the tests.
+ ModifyStorage func(tb testing.TB, cfg config.Cfg, storagePath string)
+}
+
+// CloseManager closes a TransactionManager.
+type CloseManager struct{}
+
+// AssertManager asserts whether the manager has closed and Run returned. If it has, it asserts the
+// error matched the expected. If the manager has exited with an error, AssertManager must be called
+// or the test case fails.
+type AssertManager struct {
+ // ExpectedError is the error TransactionManager's Run method is expected to return.
+ ExpectedError error
+}
+
+// Begin calls Begin on the TransactionManager to start a new transaction.
+type Begin struct {
+ // TransactionID is the identifier given to the transaction created. This is used to identify
+ // the transaction in later steps.
+ TransactionID int
+ // RelativePath is the relative path of the repository this transaction is operating on.
+ RelativePath string
+ // SnapshottedRelativePaths are the relative paths of the repositories to include in the snapshot
+ // in addition to the target repository.
+ SnapshottedRelativePaths []string
+ // ReadOnly indicates whether this is a read-only transaction.
+ ReadOnly bool
+ // Context is the context to use for the Begin call.
+ Context context.Context
+ // ExpectedSnapshot is the expected LSN this transaction should read the repsoitory's state at.
+ ExpectedSnapshotLSN LSN
+ // ExpectedError is the error expected to be returned from the Begin call.
+ ExpectedError error
+}
+
+// CreateRepository creates the transaction's repository..
+type CreateRepository struct {
+ // TransactionID is the transaction for which to create the repository.
+ TransactionID int
+ // DefaultBranch is the default branch to set in the repository.
+ DefaultBranch git.ReferenceName
+ // References are the references to create in the repository.
+ References map[git.ReferenceName]git.ObjectID
+ // Packs are the objects that are written into the repository.
+ Packs [][]byte
+ // CustomHooks are the custom hooks to write into the repository.
+ CustomHooks []byte
+ // Alternate links the given relative path as the repository's alternate.
+ Alternate string
+}
+
+// Commit calls Commit on a transaction.
+type Commit struct {
+ // TransactionID identifies the transaction to commit.
+ TransactionID int
+ // Context is the context to use for the Commit call.
+ Context context.Context
+ // ExpectedError is the error that is expected to be returned when committing the transaction.
+ // If ExpectedError is a function with signature func(tb testing.TB, actualErr error), it is
+ // run instead of asserting the error.
+ ExpectedError any
+
+ // SkipVerificationFailures sets the verification failure handling for this commit.
+ SkipVerificationFailures bool
+ // ReferenceUpdates are the reference updates to commit.
+ ReferenceUpdates ReferenceUpdates
+ // QuarantinedPacks are the packs to include in the quarantine directory of the transaction.
+ QuarantinedPacks [][]byte
+ // DefaultBranchUpdate is the default branch update to commit.
+ DefaultBranchUpdate *DefaultBranchUpdate
+ // CustomHooksUpdate is the custom hooks update to commit.
+ CustomHooksUpdate *CustomHooksUpdate
+ // CreateRepository creates the repository on commit.
+ CreateRepository bool
+ // DeleteRepository deletes the repository on commit.
+ DeleteRepository bool
+ // IncludeObjects includes objects in the transaction's logged pack.
+ IncludeObjects []git.ObjectID
+ // UpdateAlternate updates the repository's alternate when set.
+ UpdateAlternate *alternateUpdate
+}
+
+// RecordInitialReferenceValues calls RecordInitialReferenceValues on a transaction.
+type RecordInitialReferenceValues struct {
+ // TransactionID identifies the transaction to prepare the reference updates on.
+ TransactionID int
+ // InitialValues are the initial values to record.
+ InitialValues map[git.ReferenceName]git.ObjectID
+}
+
+// UpdateReferences calls UpdateReferences on a transaction.
+type UpdateReferences struct {
+ // TransactionID identifies the transaction to update references on.
+ TransactionID int
+ // ReferenceUpdates are the reference updates to make.
+ ReferenceUpdates ReferenceUpdates
+}
+
+// Rollback calls Rollback on a transaction.
+type Rollback struct {
+ // TransactionID identifies the transaction to rollback.
+ TransactionID int
+ // ExpectedError is the error that is expected to be returned when rolling back the transaction.
+ ExpectedError error
+}
+
+// Prune prunes all unreferenced objects from the repository.
+type Prune struct {
+ // ExpectedObjects are the object expected to exist in the repository after pruning.
+ ExpectedObjects []git.ObjectID
+}
+
+// RemoveRepository removes the repository from the disk. It must be run with the TransactionManager
+// closed.
+type RemoveRepository struct{}
+
+// RepositoryAssertion asserts a given transaction's view of repositories matches the expected.
+type RepositoryAssertion struct {
+ // TransactionID identifies the transaction whose snapshot to assert.
+ TransactionID int
+ // Repositories is the expected state of the repositories the transaction sees. The
+ // key is the repository's relative path and the value describes its expected state.
+ Repositories RepositoryStates
+}
+
+// StateAssertions models an assertion of the entire state managed by the TransactionManager.
+type StateAssertion struct {
+ // Database is the expected state of the database.
+ Database DatabaseState
+ // Directory is the expected state of the manager's state directory in the repository.
+ Directory testhelper.DirectoryState
+ // Repositories is the expected state of the repositories in the storage. The key is
+ // the repository's relative path and the value describes its expected state.
+ Repositories RepositoryStates
+}
+
+// steps defines execution steps in a test. Each test case can define multiple steps to exercise
+// more complex behavior.
+type steps []any
+
+type transactionTestCase struct {
+ desc string
+ steps steps
+ expectedState StateAssertion
+}
+
+func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCase, setup testTransactionSetup) {
+ logger := testhelper.NewLogger(t)
+ umask := testhelper.Umask()
+
+ storageScopedFactory, err := setup.RepositoryFactory.ScopeByStorage(setup.Config.Storages[0].Name)
+ require.NoError(t, err)
+ repo := storageScopedFactory.Build(setup.RelativePath)
+
+ repoPath, err := repo.Path()
+ require.NoError(t, err)
+
+ database, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, database)
+
+ txManager := transaction.NewManager(setup.Config, logger, backchannel.NewRegistry())
+ housekeepingManager := housekeeping.NewManager(setup.Config.Prometheus, logger, txManager)
+
+ storagePath := setup.Config.Storages[0].Path
+ stateDir := filepath.Join(storagePath, "state")
+
+ stagingDir := filepath.Join(storagePath, "staging")
+ require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir))
+
+ var (
+ // managerRunning tracks whether the manager is running or closed.
+ managerRunning bool
+ // transactionManager is the current TransactionManager instance.
+ transactionManager = NewTransactionManager(setup.PartitionID, logger, database, storagePath, stateDir, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
+ // managerErr is used for synchronizing manager closing and returning
+ // the error from Run.
+ managerErr chan error
+ // inflightTransactions tracks the number of on going transactions calls. It is used to synchronize
+ // the database hooks with transactions.
+ inflightTransactions sync.WaitGroup
+ )
+
+ // closeManager closes the manager. It waits until the manager's Run method has exited.
+ closeManager := func() {
+ t.Helper()
+
+ transactionManager.Close()
+ managerRunning, err = checkManagerError(t, ctx, managerErr, transactionManager)
+ require.NoError(t, err)
+ require.False(t, managerRunning)
+ }
+
+ // openTransactions holds references to all of the transactions that have been
+ // began in a test case.
+ openTransactions := map[int]*Transaction{}
+
+ // Close the manager if it is running at the end of the test.
+ defer func() {
+ if managerRunning {
+ closeManager()
+ }
+ }()
+ for _, step := range tc.steps {
+ switch step := step.(type) {
+ case StartManager:
+ require.False(t, managerRunning, "test error: manager started while it was already running")
+
+ if step.ModifyStorage != nil {
+ step.ModifyStorage(t, setup.Config, storagePath)
+ }
+
+ managerRunning = true
+ managerErr = make(chan error)
+
+ // The PartitionManager deletes and recreates the staging directory prior to starting a TransactionManager
+ // to clean up any stale state leftover by crashes. Do that here as well so the tests don't fail if we don't
+ // finish transactions after crash simulations.
+ require.NoError(t, os.RemoveAll(stagingDir))
+ require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir))
+
+ transactionManager = NewTransactionManager(setup.PartitionID, logger, database, storagePath, stateDir, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
+ installHooks(t, transactionManager, database, hooks{
+ beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry,
+ beforeStoreLogEntry: step.Hooks.BeforeAppendLogEntry,
+ beforeDeferredClose: func(hookContext) {
+ if step.Hooks.WaitForTransactionsWhenClosing {
+ inflightTransactions.Wait()
+ }
+ },
+ beforeDeleteLogEntry: step.Hooks.BeforeDeleteLogEntry,
+ beforeReadAppliedLSN: step.Hooks.BeforeReadAppliedLSN,
+ beforeStoreAppliedLSN: step.Hooks.BeforeStoreAppliedLSN,
+ })
+
+ go func() {
+ defer func() {
+ if r := recover(); r != nil {
+ err, ok := r.(error)
+ if !ok {
+ panic(r)
+ }
+ assert.ErrorIs(t, err, step.ExpectedError)
+ managerErr <- err
+ }
+ }()
+
+ managerErr <- transactionManager.Run()
+ }()
+ case CloseManager:
+ require.True(t, managerRunning, "test error: manager closed while it was already closed")
+ closeManager()
+ case AssertManager:
+ require.True(t, managerRunning, "test error: manager must be running for syncing")
+ managerRunning, err = checkManagerError(t, ctx, managerErr, transactionManager)
+ require.ErrorIs(t, err, step.ExpectedError)
+ case Begin:
+ require.NotContains(t, openTransactions, step.TransactionID, "test error: transaction id reused in begin")
+
+ beginCtx := ctx
+ if step.Context != nil {
+ beginCtx = step.Context
+ }
+
+ transaction, err := transactionManager.Begin(beginCtx, step.RelativePath, step.SnapshottedRelativePaths, step.ReadOnly)
+ require.ErrorIs(t, err, step.ExpectedError)
+ if err == nil {
+ require.Equal(t, step.ExpectedSnapshotLSN, transaction.SnapshotLSN())
+ }
+
+ if step.ReadOnly {
+ require.Empty(t,
+ transaction.quarantineDirectory,
+ "read-only transaction should not have a quarantine directory",
+ )
+ }
+
+ openTransactions[step.TransactionID] = transaction
+ case Commit:
+ require.Contains(t, openTransactions, step.TransactionID, "test error: transaction committed before beginning it")
+
+ transaction := openTransactions[step.TransactionID]
+ if step.SkipVerificationFailures {
+ transaction.SkipVerificationFailures()
+ }
+
+ if step.UpdateAlternate != nil {
+ transaction.UpdateAlternate(step.UpdateAlternate.relativePath)
+ }
+
+ if step.ReferenceUpdates != nil {
+ transaction.UpdateReferences(step.ReferenceUpdates)
+ }
+
+ if step.DefaultBranchUpdate != nil {
+ transaction.SetDefaultBranch(step.DefaultBranchUpdate.Reference)
+ }
+
+ if step.CustomHooksUpdate != nil {
+ transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR)
+ }
+
+ if step.QuarantinedPacks != nil {
+ for _, dir := range []string{
+ transaction.stagingDirectory,
+ transaction.quarantineDirectory,
+ } {
+ const expectedPerm = perm.PrivateDir
+ stat, err := os.Stat(dir)
+ require.NoError(t, err)
+ require.Equal(t, stat.Mode().Perm(), umask.Mask(expectedPerm),
+ "%q had %q permission but expected %q", dir, stat.Mode().Perm().String(), expectedPerm,
+ )
+ }
+
+ rewrittenRepo := setup.RepositoryFactory.Build(
+ transaction.RewriteRepository(&gitalypb.Repository{
+ StorageName: setup.Config.Storages[0].Name,
+ RelativePath: transaction.relativePath,
+ }),
+ )
+
+ for _, pack := range step.QuarantinedPacks {
+ require.NoError(t, rewrittenRepo.UnpackObjects(ctx, bytes.NewReader(pack)))
+ }
+ }
+
+ if step.DeleteRepository {
+ transaction.DeleteRepository()
+ }
+
+ for _, objectID := range step.IncludeObjects {
+ transaction.IncludeObject(objectID)
+ }
+
+ commitCtx := ctx
+ if step.Context != nil {
+ commitCtx = step.Context
+ }
+
+ commitErr := transaction.Commit(commitCtx)
+ switch expectedErr := step.ExpectedError.(type) {
+ case func(testing.TB, error):
+ expectedErr(t, commitErr)
+ case error:
+ require.ErrorIs(t, commitErr, expectedErr)
+ case nil:
+ require.NoError(t, commitErr)
+ default:
+ t.Fatalf("unexpected error type: %T", expectedErr)
+ }
+ case RecordInitialReferenceValues:
+ require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it")
+
+ transaction := openTransactions[step.TransactionID]
+ require.NoError(t, transaction.RecordInitialReferenceValues(ctx, step.InitialValues))
+ case UpdateReferences:
+ require.Contains(t, openTransactions, step.TransactionID, "test error: reference updates aborted on committed before beginning it")
+
+ transaction := openTransactions[step.TransactionID]
+ transaction.UpdateReferences(step.ReferenceUpdates)
+ case Rollback:
+ require.Contains(t, openTransactions, step.TransactionID, "test error: transaction rollbacked before beginning it")
+ require.Equal(t, step.ExpectedError, openTransactions[step.TransactionID].Rollback())
+ case Prune:
+ // Repack all objects into a single pack and remove all other packs to remove all
+ // unreachable objects from the packs.
+ gittest.Exec(t, setup.Config, "-C", repoPath, "repack", "-ad")
+ // Prune all unreachable loose objects in the repository.
+ gittest.Exec(t, setup.Config, "-C", repoPath, "prune")
+
+ require.ElementsMatch(t, step.ExpectedObjects, gittest.ListObjects(t, setup.Config, repoPath))
+ case RemoveRepository:
+ require.NoError(t, os.RemoveAll(repoPath))
+ case CreateRepository:
+ require.Contains(t, openTransactions, step.TransactionID, "test error: repository created in transaction before beginning it")
+
+ transaction := openTransactions[step.TransactionID]
+ require.NoError(t, repoutil.Create(
+ ctx,
+ logger,
+ config.NewLocator(setup.Config),
+ setup.CommandFactory,
+ nil,
+ counter.NewRepositoryCounter(setup.Config.Storages),
+ transaction.RewriteRepository(&gitalypb.Repository{
+ StorageName: setup.Config.Storages[0].Name,
+ RelativePath: transaction.relativePath,
+ }),
+ func(repoProto *gitalypb.Repository) error {
+ repo := setup.RepositoryFactory.Build(repoProto)
+
+ if step.DefaultBranch != "" {
+ require.NoError(t, repo.SetDefaultBranch(ctx, nil, step.DefaultBranch))
+ }
+
+ for _, pack := range step.Packs {
+ require.NoError(t, repo.UnpackObjects(ctx, bytes.NewReader(pack)))
+ }
+
+ for name, oid := range step.References {
+ require.NoError(t, repo.UpdateRef(ctx, name, oid, setup.ObjectHash.ZeroOID))
+ }
+
+ if step.CustomHooks != nil {
+ require.NoError(t,
+ repoutil.SetCustomHooks(ctx, logger, config.NewLocator(setup.Config), nil, bytes.NewReader(step.CustomHooks), repo),
+ )
+ }
+
+ if step.Alternate != "" {
+ repoPath, err := repo.Path()
+ require.NoError(t, err)
+
+ require.NoError(t, os.WriteFile(stats.AlternatesFilePath(repoPath), []byte(step.Alternate), fs.ModePerm))
+ }
+
+ return nil
+ },
+ repoutil.WithObjectHash(setup.ObjectHash),
+ ))
+ case RepositoryAssertion:
+ require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it")
+ transaction := openTransactions[step.TransactionID]
+
+ RequireRepositories(t, ctx, setup.Config,
+ // Assert the contents of the transaction's snapshot.
+ filepath.Join(setup.Config.Storages[0].Path, transaction.snapshot.prefix),
+ // Rewrite all of the repositories to point to their snapshots.
+ func(relativePath string) *localrepo.Repo {
+ return setup.RepositoryFactory.Build(
+ transaction.RewriteRepository(&gitalypb.Repository{
+ StorageName: setup.Config.Storages[0].Name,
+ RelativePath: relativePath,
+ }),
+ )
+ }, step.Repositories)
+ default:
+ t.Fatalf("unhandled step type: %T", step)
+ }
+ }
+
+ if managerRunning {
+ managerRunning, err = checkManagerError(t, ctx, managerErr, transactionManager)
+ require.NoError(t, err)
+ }
+
+ RequireDatabase(t, ctx, database, tc.expectedState.Database)
+
+ expectedRepositories := tc.expectedState.Repositories
+ if expectedRepositories == nil {
+ expectedRepositories = RepositoryStates{
+ setup.RelativePath: {},
+ }
+ }
+
+ for relativePath, state := range expectedRepositories {
+ if state.Objects == nil {
+ state.Objects = []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ setup.Commits.Second.OID,
+ setup.Commits.Third.OID,
+ setup.Commits.Diverging.OID,
+ }
+ }
+
+ if state.DefaultBranch == "" {
+ state.DefaultBranch = git.DefaultRef
+ }
+
+ expectedRepositories[relativePath] = state
+ }
+
+ RequireRepositories(t, ctx, setup.Config, setup.Config.Storages[0].Path, storageScopedFactory.Build, expectedRepositories)
+
+ expectedDirectory := tc.expectedState.Directory
+ if expectedDirectory == nil {
+ // Set the base state as the default so we don't have to repeat it in every test case but it
+ // gets asserted.
+ expectedDirectory = testhelper.DirectoryState{
+ "/": {Mode: fs.ModeDir | perm.PrivateDir},
+ "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
+ }
+ }
+
+ testhelper.RequireDirectoryState(t, stateDir, "", expectedDirectory)
+
+ entries, err := os.ReadDir(stagingDir)
+ require.NoError(t, err)
+ require.Empty(t, entries, "staging directory was not cleaned up")
+}
+
+func checkManagerError(t *testing.T, ctx context.Context, managerErrChannel chan error, mgr *TransactionManager) (bool, error) {
+ t.Helper()
+
+ testTransaction := &Transaction{
+ referenceUpdates: []ReferenceUpdates{{"sentinel": {}}},
+ result: make(chan error, 1),
+ finish: func() error { return nil },
+ }
+
+ var (
+ // managerErr is the error returned from the TransactionManager's Run method.
+ managerErr error
+ // closeChannel determines whether the channel was still open. If so, we need to close it
+ // so further calls of checkManagerError do not block as they won't manage to receive an err
+ // as it was already received and won't be able to send as the manager is no longer running.
+ closeChannel bool
+ )
+
+ select {
+ case managerErr, closeChannel = <-managerErrChannel:
+ case mgr.admissionQueue <- testTransaction:
+ // If the error channel doesn't receive, we don't know whether it is because the manager is still running
+ // or we are still waiting for it to return. We test whether the manager is running or not here by queueing a
+ // a transaction that will error. If the manager processes it, we know it is still running.
+ //
+ // If the manager was closed, it might manage to admit the testTransaction but not process it. To determine
+ // whether that was the case, we also keep waiting on the managerErr channel.
+ select {
+ case err := <-testTransaction.result:
+ require.Error(t, err, "test transaction is expected to error out")
+
+ // Begin a transaction to wait until the manager has applied all log entries currently
+ // committed. This ensures the disk state assertions run with all log entries fully applied
+ // to the repository.
+ tx, err := mgr.Begin(ctx, "non-existent", nil, false)
+ require.NoError(t, err)
+ require.NoError(t, tx.Rollback())
+
+ return true, nil
+ case managerErr, closeChannel = <-managerErrChannel:
+ }
+ }
+
+ if closeChannel {
+ close(managerErrChannel)
+ }
+
+ return false, managerErr
+}
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
index b4188bfe1..733165fa8 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
@@ -24,12 +24,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
- "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
@@ -38,6 +35,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
+// errSimulatedCrash is used in the tests to simulate a crash at a certain point during
+// TransactionManager.Run execution.
+var errSimulatedCrash = errors.New("simulated crash")
+
func validCustomHooks(tb testing.TB) []byte {
tb.Helper()
@@ -159,28 +160,10 @@ func TestTransactionManager(t *testing.T) {
umask := testhelper.Umask()
ctx := testhelper.Context(t)
- type testCommit struct {
- OID git.ObjectID
- Pack []byte
- }
-
- type testCommits struct {
- First testCommit
- Second testCommit
- Third testCommit
- Diverging testCommit
- }
-
- type testSetup struct {
- Config config.Cfg
- CommandFactory git.CommandFactory
- RepositoryFactory localrepo.Factory
- ObjectHash git.ObjectHash
- NonExistentOID git.ObjectID
- Commits testCommits
- }
+ // partitionID is the partition ID used in the tests for the TransactionManager.
+ const partitionID partitionID = 1
- setupTest := func(t *testing.T, relativePath string) testSetup {
+ setupTest := func(t *testing.T, relativePath string) testTransactionSetup {
t.Helper()
cfg := testcfg.Build(t)
@@ -229,26 +212,30 @@ func TestTransactionManager(t *testing.T) {
return pack.Bytes()
}
- return testSetup{
+ return testTransactionSetup{
+ PartitionID: partitionID,
+ RelativePath: relativePath,
+ RepositoryPath: repoPath,
+ Repo: localRepo,
Config: cfg,
ObjectHash: objectHash,
CommandFactory: cmdFactory,
RepositoryFactory: localrepo.NewFactory(logger, locator, cmdFactory, catfileCache),
NonExistentOID: nonExistentOID,
- Commits: testCommits{
- First: testCommit{
+ Commits: testTransactionCommits{
+ First: testTransactionCommit{
OID: firstCommitOID,
Pack: packCommit(firstCommitOID),
},
- Second: testCommit{
+ Second: testTransactionCommit{
OID: secondCommitOID,
Pack: packCommit(secondCommitOID),
},
- Third: testCommit{
+ Third: testTransactionCommit{
OID: thirdCommitOID,
Pack: packCommit(thirdCommitOID),
},
- Diverging: testCommit{
+ Diverging: testTransactionCommit{
OID: divergingCommitOID,
Pack: packCommit(divergingCommitOID),
},
@@ -262,186 +249,7 @@ func TestTransactionManager(t *testing.T) {
relativePath := gittest.NewRepositoryName(t)
setup := setupTest(t, relativePath)
- // errSimulatedCrash is used in the tests to simulate a crash at a certain point during
- // TransactionManager.Run execution.
- errSimulatedCrash := errors.New("simulated crash")
-
- // partitionID is the partition ID used in the tests for the TransactionManager.
- const partitionID partitionID = 1
-
- type testHooks struct {
- // BeforeApplyLogEntry is called before a log entry is applied to the repository.
- BeforeApplyLogEntry hookFunc
- // BeforeAppendLogEntry is called before a log entry is appended to the log.
- BeforeAppendLogEntry hookFunc
- // BeforeDeleteLogEntry is called before a log entry is deleted.
- BeforeDeleteLogEntry hookFunc
- // BeforeReadAppliedLSN is invoked before the applied LSN is read.
- BeforeReadAppliedLSN hookFunc
- // BeforeStoreAppliedLSN is invoked before the applied LSN is stored.
- BeforeStoreAppliedLSN hookFunc
- // WaitForTransactionsWhenClosing waits for a in-flight to finish before returning
- // from Run.
- WaitForTransactionsWhenClosing bool
- }
-
- // StartManager starts a TransactionManager.
- type StartManager struct {
- // Hooks contains the hook functions that are configured on the TransactionManager. These allow
- // for better synchronization.
- Hooks testHooks
- // ExpectedError is the expected error to be raised from the manager's Run. Panics are converted
- // to errors and asserted to match this as well.
- ExpectedError error
- // ModifyStorage allows modifying the storage prior to the manager starting. This
- // may be necessary to test some states that can be reached from hard crashes
- // but not during the tests.
- ModifyStorage func(tb testing.TB, cfg config.Cfg, storagePath string)
- }
-
- // CloseManager closes a TransactionManager.
- type CloseManager struct{}
-
- // AssertManager asserts whether the manager has closed and Run returned. If it has, it asserts the
- // error matched the expected. If the manager has exited with an error, AssertManager must be called
- // or the test case fails.
- type AssertManager struct {
- // ExpectedError is the error TransactionManager's Run method is expected to return.
- ExpectedError error
- }
-
- // Begin calls Begin on the TransactionManager to start a new transaction.
- type Begin struct {
- // TransactionID is the identifier given to the transaction created. This is used to identify
- // the transaction in later steps.
- TransactionID int
- // RelativePath is the relative path of the repository this transaction is operating on.
- RelativePath string
- // SnapshottedRelativePaths are the relative paths of the repositories to include in the snapshot
- // in addition to the target repository.
- SnapshottedRelativePaths []string
- // ReadOnly indicates whether this is a read-only transaction.
- ReadOnly bool
- // Context is the context to use for the Begin call.
- Context context.Context
- // ExpectedSnapshot is the expected LSN this transaction should read the repsoitory's state at.
- ExpectedSnapshotLSN LSN
- // ExpectedError is the error expected to be returned from the Begin call.
- ExpectedError error
- }
-
- // CreateRepository creates the transaction's repository..
- type CreateRepository struct {
- // TransactionID is the transaction for which to create the repository.
- TransactionID int
- // DefaultBranch is the default branch to set in the repository.
- DefaultBranch git.ReferenceName
- // References are the references to create in the repository.
- References map[git.ReferenceName]git.ObjectID
- // Packs are the objects that are written into the repository.
- Packs [][]byte
- // CustomHooks are the custom hooks to write into the repository.
- CustomHooks []byte
- // Alternate links the given relative path as the repository's alternate.
- Alternate string
- }
-
- // Commit calls Commit on a transaction.
- type Commit struct {
- // TransactionID identifies the transaction to commit.
- TransactionID int
- // Context is the context to use for the Commit call.
- Context context.Context
- // ExpectedError is the error that is expected to be returned when committing the transaction.
- // If ExpectedError is a function with signature func(tb testing.TB, actualErr error), it is
- // ran instead to asser the error.
- ExpectedError any
-
- // SkipVerificationFailures sets the verification failure handling for this commit.
- SkipVerificationFailures bool
- // ReferenceUpdates are the reference updates to commit.
- ReferenceUpdates ReferenceUpdates
- // QuarantinedPacks are the packs to include in the quarantine directory of the transaction.
- QuarantinedPacks [][]byte
- // DefaultBranchUpdate is the default branch update to commit.
- DefaultBranchUpdate *DefaultBranchUpdate
- // CustomHooksUpdate is the custom hooks update to commit.
- CustomHooksUpdate *CustomHooksUpdate
- // CreateRepository creates the repository on commit.
- CreateRepository bool
- // DeleteRepository deletes the repository on commit.
- DeleteRepository bool
- // IncludeObjects includes objects in the transaction's logged pack.
- IncludeObjects []git.ObjectID
- // UpdateAlternate updates the repository's alternate when set.
- UpdateAlternate *alternateUpdate
- }
-
- // RecordInitialReferenceValues calls RecordInitialReferenceValues on a transaction.
- type RecordInitialReferenceValues struct {
- // TransactionID identifies the transaction to prepare the reference updates on.
- TransactionID int
- // InitialValues are the initial values to record.
- InitialValues map[git.ReferenceName]git.ObjectID
- }
-
- // UpdateReferences calls UpdateReferences on a transaction.
- type UpdateReferences struct {
- // TransactionID identifies the transaction to update references on.
- TransactionID int
- // ReferenceUpdates are the reference updates to make.
- ReferenceUpdates ReferenceUpdates
- }
-
- // Rollback calls Rollback on a transaction.
- type Rollback struct {
- // TransactionID identifies the transaction to rollback.
- TransactionID int
- // ExpectedError is the error that is expected to be returned when rolling back the transaction.
- ExpectedError error
- }
-
- // Prune prunes all unreferenced objects from the repository.
- type Prune struct {
- // ExpectedObjects are the object expected to exist in the repository after pruning.
- ExpectedObjects []git.ObjectID
- }
-
- // RemoveRepository removes the repository from the disk. It must be run with the TransactionManager
- // closed.
- type RemoveRepository struct{}
-
- // RepositoryAssertion asserts a given transaction's view of repositories matches the expected.
- type RepositoryAssertion struct {
- // TransactionID identifies the transaction whose snapshot to assert.
- TransactionID int
- // Repositories is the expected state of the repositories the transaction sees. The
- // key is the repository's relative path and the value describes its expected state.
- Repositories RepositoryStates
- }
-
- // StateAssertions models an assertion of the entire state managed by the TransactionManager.
- type StateAssertion struct {
- // Database is the expected state of the database.
- Database DatabaseState
- // Directory is the expected state of the manager's state directory in the repository.
- Directory testhelper.DirectoryState
- // Repositories is the expected state of the repositories in the storage. The key is
- // the repository's relative path and the value describes its expected state.
- Repositories RepositoryStates
- }
-
- // steps defines execution steps in a test. Each test case can define multiple steps to exercise
- // more complex behavior.
- type steps []any
-
- type testCase struct {
- desc string
- steps steps
- expectedState StateAssertion
- }
-
- testCases := []testCase{
+ testCases := []transactionTestCase{
{
desc: "invalid reference aborts the entire transaction",
steps: steps{
@@ -1841,7 +1649,7 @@ func TestTransactionManager(t *testing.T) {
desc: "reapplying custom hooks works",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -1891,7 +1699,7 @@ func TestTransactionManager(t *testing.T) {
desc: "hook index is correctly determined from log and disk",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeApplyLogEntry: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -2086,7 +1894,7 @@ func TestTransactionManager(t *testing.T) {
desc: "continues processing after failing to store log index",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookCtx hookContext) {
panic(errSimulatedCrash)
},
@@ -2136,7 +1944,7 @@ func TestTransactionManager(t *testing.T) {
desc: "recovers from the write-ahead log on start up",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeApplyLogEntry: func(hookCtx hookContext) {
hookCtx.closeManager()
},
@@ -2183,7 +1991,7 @@ func TestTransactionManager(t *testing.T) {
desc: "reference verification fails after recovering logged writes",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeApplyLogEntry: func(hookCtx hookContext) {
hookCtx.closeManager()
},
@@ -2262,13 +2070,13 @@ func TestTransactionManager(t *testing.T) {
},
},
},
- func() testCase {
+ func() transactionTestCase {
ctx, cancel := context.WithCancel(ctx)
- return testCase{
+ return transactionTestCase{
desc: "commit returns if context is canceled after admission",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeAppendLogEntry: func(hookCtx hookContext) {
// Cancel the context used in Commit
cancel()
@@ -2303,7 +2111,7 @@ func TestTransactionManager(t *testing.T) {
desc: "commit returns if transaction processing stops before transaction acceptance",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.closeManager() },
// This ensures we are testing the context cancellation errors being unwrapped properly
// to an ErrTransactionProcessingStopped instead of hitting the general case when
@@ -2324,7 +2132,7 @@ func TestTransactionManager(t *testing.T) {
desc: "commit returns if transaction processing stops after transaction acceptance",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeApplyLogEntry: func(hookCtx hookContext) {
hookCtx.closeManager()
},
@@ -2601,7 +2409,7 @@ func TestTransactionManager(t *testing.T) {
desc: "update default branch fails before storing log index",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookCtx hookContext) {
panic(errSimulatedCrash)
},
@@ -2913,7 +2721,7 @@ func TestTransactionManager(t *testing.T) {
StartManager{
// Crash the manager before the third transaction is applied. This allows us to
// prune before it is applied to ensure the pack file contains all necessary commits.
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeApplyLogEntry: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -2996,7 +2804,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
Prune{},
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -3601,7 +3409,7 @@ func TestTransactionManager(t *testing.T) {
desc: "logged repository deletions are considered after restart",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeApplyLogEntry: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -3645,7 +3453,7 @@ func TestTransactionManager(t *testing.T) {
desc: "reapplying repository deletion works",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -3796,7 +3604,7 @@ func TestTransactionManager(t *testing.T) {
desc: "failing initialization prevents transaction beginning",
steps: steps{
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeReadAppliedLSN: func(hookContext) {
// Raise a panic when the manager is about to read the applied log
// index when initializing. In reality this would crash the server but
@@ -4507,7 +4315,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
RemoveRepository{},
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeApplyLogEntry: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -4551,7 +4359,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
RemoveRepository{},
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -5338,7 +5146,7 @@ func TestTransactionManager(t *testing.T) {
},
CloseManager{},
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -5412,7 +5220,7 @@ func TestTransactionManager(t *testing.T) {
},
CloseManager{},
StartManager{
- Hooks: testHooks{
+ Hooks: testTransactionHooks{
BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
@@ -6229,7 +6037,7 @@ func TestTransactionManager(t *testing.T) {
}
appendInvalidReferenceTestCase := func(tc invalidReferenceTestCase) {
- testCases = append(testCases, testCase{
+ testCases = append(testCases, transactionTestCase{
desc: fmt.Sprintf("invalid reference %s", tc.desc),
steps: steps{
StartManager{},
@@ -6309,403 +6117,11 @@ func TestTransactionManager(t *testing.T) {
// Setup the repository with the exact same state as what was used to build the test cases.
setup := setupTest(t, relativePath)
- logger := testhelper.NewLogger(t)
-
- storageScopedFactory, err := setup.RepositoryFactory.ScopeByStorage(setup.Config.Storages[0].Name)
- require.NoError(t, err)
- repo := storageScopedFactory.Build(relativePath)
-
- repoPath, err := repo.Path()
- require.NoError(t, err)
-
- database, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
- require.NoError(t, err)
- defer testhelper.MustClose(t, database)
-
- txManager := transaction.NewManager(setup.Config, logger, backchannel.NewRegistry())
- housekeepingManager := housekeeping.NewManager(setup.Config.Prometheus, logger, txManager)
-
- storagePath := setup.Config.Storages[0].Path
- stateDir := filepath.Join(storagePath, "state")
-
- stagingDir := filepath.Join(storagePath, "staging")
- require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir))
-
- var (
- // managerRunning tracks whether the manager is running or closed.
- managerRunning bool
- // transactionManager is the current TransactionManager instance.
- transactionManager = NewTransactionManager(partitionID, logger, database, storagePath, stateDir, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
- // managerErr is used for synchronizing manager closing and returning
- // the error from Run.
- managerErr chan error
- // inflightTransactions tracks the number of on going transactions calls. It is used to synchronize
- // the database hooks with transactions.
- inflightTransactions sync.WaitGroup
- )
-
- // closeManager closes the manager. It waits until the manager's Run method has exited.
- closeManager := func() {
- t.Helper()
-
- transactionManager.Close()
- managerRunning, err = checkManagerError(t, ctx, managerErr, transactionManager)
- require.NoError(t, err)
- require.False(t, managerRunning)
- }
-
- // openTransactions holds references to all of the transactions that have been
- // began in a test case.
- openTransactions := map[int]*Transaction{}
-
- // Close the manager if it is running at the end of the test.
- defer func() {
- if managerRunning {
- closeManager()
- }
- }()
- for _, step := range tc.steps {
- switch step := step.(type) {
- case StartManager:
- require.False(t, managerRunning, "test error: manager started while it was already running")
-
- if step.ModifyStorage != nil {
- step.ModifyStorage(t, setup.Config, storagePath)
- }
-
- managerRunning = true
- managerErr = make(chan error)
-
- // The PartitionManager deletes and recreates the staging directory prior to starting a TransactionManager
- // to clean up any stale state leftover by crashes. Do that here as well so the tests don't fail if we don't
- // finish transactions after crash simulations.
- require.NoError(t, os.RemoveAll(stagingDir))
- require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir))
-
- transactionManager = NewTransactionManager(partitionID, logger, database, storagePath, stateDir, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
- installHooks(t, transactionManager, database, hooks{
- beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry,
- beforeStoreLogEntry: step.Hooks.BeforeAppendLogEntry,
- beforeDeferredClose: func(hookContext) {
- if step.Hooks.WaitForTransactionsWhenClosing {
- inflightTransactions.Wait()
- }
- },
- beforeDeleteLogEntry: step.Hooks.BeforeDeleteLogEntry,
- beforeReadAppliedLSN: step.Hooks.BeforeReadAppliedLSN,
- beforeStoreAppliedLSN: step.Hooks.BeforeStoreAppliedLSN,
- })
-
- go func() {
- defer func() {
- if r := recover(); r != nil {
- err, ok := r.(error)
- if !ok {
- panic(r)
- }
- assert.ErrorIs(t, err, step.ExpectedError)
- managerErr <- err
- }
- }()
-
- managerErr <- transactionManager.Run()
- }()
- case CloseManager:
- require.True(t, managerRunning, "test error: manager closed while it was already closed")
- closeManager()
- case AssertManager:
- require.True(t, managerRunning, "test error: manager must be running for syncing")
- managerRunning, err = checkManagerError(t, ctx, managerErr, transactionManager)
- require.ErrorIs(t, err, step.ExpectedError)
- case Begin:
- require.NotContains(t, openTransactions, step.TransactionID, "test error: transaction id reused in begin")
-
- beginCtx := ctx
- if step.Context != nil {
- beginCtx = step.Context
- }
-
- transaction, err := transactionManager.Begin(beginCtx, step.RelativePath, step.SnapshottedRelativePaths, step.ReadOnly)
- require.ErrorIs(t, err, step.ExpectedError)
- if err == nil {
- require.Equal(t, step.ExpectedSnapshotLSN, transaction.SnapshotLSN())
- }
-
- if step.ReadOnly {
- require.Empty(t,
- transaction.quarantineDirectory,
- "read-only transaction should not have a quarantine directory",
- )
- }
-
- openTransactions[step.TransactionID] = transaction
- case Commit:
- require.Contains(t, openTransactions, step.TransactionID, "test error: transaction committed before beginning it")
-
- transaction := openTransactions[step.TransactionID]
- if step.SkipVerificationFailures {
- transaction.SkipVerificationFailures()
- }
-
- if step.UpdateAlternate != nil {
- transaction.UpdateAlternate(step.UpdateAlternate.relativePath)
- }
-
- if step.ReferenceUpdates != nil {
- transaction.UpdateReferences(step.ReferenceUpdates)
- }
-
- if step.DefaultBranchUpdate != nil {
- transaction.SetDefaultBranch(step.DefaultBranchUpdate.Reference)
- }
-
- if step.CustomHooksUpdate != nil {
- transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR)
- }
-
- if step.QuarantinedPacks != nil {
- for _, dir := range []string{
- transaction.stagingDirectory,
- transaction.quarantineDirectory,
- } {
- const expectedPerm = perm.PrivateDir
- stat, err := os.Stat(dir)
- require.NoError(t, err)
- require.Equal(t, stat.Mode().Perm(), umask.Mask(expectedPerm),
- "%q had %q permission but expected %q", dir, stat.Mode().Perm().String(), expectedPerm,
- )
- }
-
- rewrittenRepo := setup.RepositoryFactory.Build(
- transaction.RewriteRepository(&gitalypb.Repository{
- StorageName: setup.Config.Storages[0].Name,
- RelativePath: transaction.relativePath,
- }),
- )
-
- for _, pack := range step.QuarantinedPacks {
- require.NoError(t, rewrittenRepo.UnpackObjects(ctx, bytes.NewReader(pack)))
- }
- }
-
- if step.DeleteRepository {
- transaction.DeleteRepository()
- }
-
- for _, objectID := range step.IncludeObjects {
- transaction.IncludeObject(objectID)
- }
-
- commitCtx := ctx
- if step.Context != nil {
- commitCtx = step.Context
- }
-
- commitErr := transaction.Commit(commitCtx)
- switch expectedErr := step.ExpectedError.(type) {
- case func(testing.TB, error):
- expectedErr(t, commitErr)
- case error:
- require.ErrorIs(t, commitErr, expectedErr)
- case nil:
- require.NoError(t, commitErr)
- default:
- t.Fatalf("unexpected error type: %T", expectedErr)
- }
- case RecordInitialReferenceValues:
- require.Contains(t, openTransactions, step.TransactionID, "test error: record initial reference value on transaction before beginning it")
-
- transaction := openTransactions[step.TransactionID]
- require.NoError(t, transaction.RecordInitialReferenceValues(ctx, step.InitialValues))
- case UpdateReferences:
- require.Contains(t, openTransactions, step.TransactionID, "test error: reference updates aborted on committed before beginning it")
-
- transaction := openTransactions[step.TransactionID]
- transaction.UpdateReferences(step.ReferenceUpdates)
- case Rollback:
- require.Contains(t, openTransactions, step.TransactionID, "test error: transaction rollbacked before beginning it")
- require.Equal(t, step.ExpectedError, openTransactions[step.TransactionID].Rollback())
- case Prune:
- // Repack all objects into a single pack and remove all other packs to remove all
- // unreachable objects from the packs.
- gittest.Exec(t, setup.Config, "-C", repoPath, "repack", "-ad")
- // Prune all unreachable loose objects in the repository.
- gittest.Exec(t, setup.Config, "-C", repoPath, "prune")
-
- require.ElementsMatch(t, step.ExpectedObjects, gittest.ListObjects(t, setup.Config, repoPath))
- case RemoveRepository:
- require.NoError(t, os.RemoveAll(repoPath))
- case CreateRepository:
- require.Contains(t, openTransactions, step.TransactionID, "test error: repository created in transaction before beginning it")
-
- transaction := openTransactions[step.TransactionID]
- require.NoError(t, repoutil.Create(
- ctx,
- logger,
- config.NewLocator(setup.Config),
- setup.CommandFactory,
- nil,
- counter.NewRepositoryCounter(setup.Config.Storages),
- transaction.RewriteRepository(&gitalypb.Repository{
- StorageName: setup.Config.Storages[0].Name,
- RelativePath: transaction.relativePath,
- }),
- func(repoProto *gitalypb.Repository) error {
- repo := setup.RepositoryFactory.Build(repoProto)
-
- if step.DefaultBranch != "" {
- require.NoError(t, repo.SetDefaultBranch(ctx, nil, step.DefaultBranch))
- }
-
- for _, pack := range step.Packs {
- require.NoError(t, repo.UnpackObjects(ctx, bytes.NewReader(pack)))
- }
-
- for name, oid := range step.References {
- require.NoError(t, repo.UpdateRef(ctx, name, oid, setup.ObjectHash.ZeroOID))
- }
-
- if step.CustomHooks != nil {
- require.NoError(t,
- repoutil.SetCustomHooks(ctx, logger, config.NewLocator(setup.Config), nil, bytes.NewReader(step.CustomHooks), repo),
- )
- }
-
- if step.Alternate != "" {
- repoPath, err := repo.Path()
- require.NoError(t, err)
-
- require.NoError(t, os.WriteFile(stats.AlternatesFilePath(repoPath), []byte(step.Alternate), fs.ModePerm))
- }
-
- return nil
- },
- repoutil.WithObjectHash(setup.ObjectHash),
- ))
- case RepositoryAssertion:
- require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it")
- transaction := openTransactions[step.TransactionID]
-
- RequireRepositories(t, ctx, setup.Config,
- // Assert the contents of the transaction's snapshot.
- filepath.Join(setup.Config.Storages[0].Path, transaction.snapshot.prefix),
- // Rewrite all of the repositories to point to their snapshots.
- func(relativePath string) *localrepo.Repo {
- return setup.RepositoryFactory.Build(
- transaction.RewriteRepository(&gitalypb.Repository{
- StorageName: setup.Config.Storages[0].Name,
- RelativePath: relativePath,
- }),
- )
- }, step.Repositories)
- default:
- t.Fatalf("unhandled step type: %T", step)
- }
- }
-
- if managerRunning {
- managerRunning, err = checkManagerError(t, ctx, managerErr, transactionManager)
- require.NoError(t, err)
- }
-
- RequireDatabase(t, ctx, database, tc.expectedState.Database)
-
- expectedRepositories := tc.expectedState.Repositories
- if expectedRepositories == nil {
- expectedRepositories = RepositoryStates{
- relativePath: {},
- }
- }
-
- for relativePath, state := range expectedRepositories {
- if state.Objects == nil {
- state.Objects = []git.ObjectID{
- setup.ObjectHash.EmptyTreeOID,
- setup.Commits.First.OID,
- setup.Commits.Second.OID,
- setup.Commits.Third.OID,
- setup.Commits.Diverging.OID,
- }
- }
-
- if state.DefaultBranch == "" {
- state.DefaultBranch = git.DefaultRef
- }
-
- expectedRepositories[relativePath] = state
- }
-
- RequireRepositories(t, ctx, setup.Config, setup.Config.Storages[0].Path, storageScopedFactory.Build, expectedRepositories)
-
- expectedDirectory := tc.expectedState.Directory
- if expectedDirectory == nil {
- // Set the base state as the default so we don't have to repeat it in every test case but it
- // gets asserted.
- expectedDirectory = testhelper.DirectoryState{
- "/": {Mode: fs.ModeDir | perm.PrivateDir},
- "/wal": {Mode: fs.ModeDir | perm.PrivateDir},
- }
- }
-
- testhelper.RequireDirectoryState(t, stateDir, "", expectedDirectory)
-
- entries, err := os.ReadDir(stagingDir)
- require.NoError(t, err)
- require.Empty(t, entries, "staging directory was not cleaned up")
+ runTransactionTest(t, ctx, tc, setup)
})
}
}
-func checkManagerError(t *testing.T, ctx context.Context, managerErrChannel chan error, mgr *TransactionManager) (bool, error) {
- t.Helper()
-
- testTransaction := &Transaction{
- referenceUpdates: []ReferenceUpdates{{"sentinel": {}}},
- result: make(chan error, 1),
- finish: func() error { return nil },
- }
-
- var (
- // managerErr is the error returned from the TransactionManager's Run method.
- managerErr error
- // closeChannel determines whether the channel was still open. If so, we need to close it
- // so further calls of checkManagerError do not block as they won't manage to receive an err
- // as it was already received and won't be able to send as the manager is no longer running.
- closeChannel bool
- )
-
- select {
- case managerErr, closeChannel = <-managerErrChannel:
- case mgr.admissionQueue <- testTransaction:
- // If the error channel doesn't receive, we don't know whether it is because the manager is still running
- // or we are still waiting for it to return. We test whether the manager is running or not here by queueing a
- // a transaction that will error. If the manager processes it, we know it is still running.
- //
- // If the manager was closed, it might manage to admit the testTransaction but not process it. To determine
- // whether that was the case, we also keep waiting on the managerErr channel.
- select {
- case err := <-testTransaction.result:
- require.Error(t, err, "test transaction is expected to error out")
-
- // Begin a transaction to wait until the manager has applied all log entries currently
- // committed. This ensures the disk state assertions run with all log entries fully applied
- // to the repository.
- tx, err := mgr.Begin(ctx, "non-existent", nil, false)
- require.NoError(t, err)
- require.NoError(t, tx.Rollback())
-
- return true, nil
- case managerErr, closeChannel = <-managerErrChannel:
- }
- }
-
- if closeChannel {
- close(managerErrChannel)
- }
-
- return false, managerErr
-}
-
// BenchmarkTransactionManager benchmarks the transaction throughput of the TransactionManager at various levels
// of concurrency and transaction sizes.
func BenchmarkTransactionManager(b *testing.B) {