diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-03-28 15:59:29 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-03-28 15:59:29 +0300 |
commit | 9d4b88ce194fa1b4d19f0e7c8c10780e1d350d0e (patch) | |
tree | c147c2a889b49caead18b76f8eef3daae9342b95 | |
parent | 5ddeeaeb941e70016d7cc970fd89b3fb2f1cfdb6 (diff) | |
parent | 07e0b741141ef5e4ac22581eb807abd428cfdfc1 (diff) |
Merge branch 'smh-open-transaction' into 'master'
Provide means to open a transaction through TransactionManager
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5503
Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r-- | internal/gitaly/transaction_manager.go | 127 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager_test.go | 705 |
2 files changed, 375 insertions, 457 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 067e3e6d5..efaf49599 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -106,18 +106,62 @@ type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate // Transaction is a unit-of-work that contains reference changes to perform on the repository. type Transaction struct { - // SkipVerificationFailures causes references updates that failed the verification step to be - // dropped from the transaction. By default, any verification failures abort the entire transaction. - // - // The default behavior maps to the `--atomic` flag of `git push`. When this is set, the behavior matches - // what happens git's behavior without the flag. - SkipVerificationFailures bool - // ReferenceUpdates contains the reference updates to be performed. - ReferenceUpdates - // DefaultBranchUpdate contains the information to update the default branch of the repo. - DefaultBranchUpdate *DefaultBranchUpdate - // CustomHooksUpdate contains the custom hooks to set in the repository. - CustomHooksUpdate *CustomHooksUpdate + // commit commits the Transaction through the TransactionManager. + commit func(context.Context, *Transaction) error + // result is where the outcome of the transaction is sent ot by TransactionManager once it + // has been determined. + result chan error + + skipVerificationFailures bool + referenceUpdates ReferenceUpdates + defaultBranchUpdate *DefaultBranchUpdate + customHooksUpdate *CustomHooksUpdate +} + +// Begin opens a new transaction. The caller must call either Commit or Rollback to release +// the resources tied to the transaction. The returned Transaction is not safe for concurrent use. +func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) { + return &Transaction{commit: mgr.commit}, nil +} + +// Commit performs the changes. If no error is returned, the transaction was successful and the changes +// have been performed. If an error was returned, the transaction may or may not be persisted. +func (txn *Transaction) Commit(ctx context.Context) error { + return txn.commit(ctx, txn) +} + +// Rollback releases resources associated with the transaction without performing any changes. +func (txn *Transaction) Rollback() {} + +// SkipVerificationFailures configures the transaction to skip reference updates that fail verification. +// If a reference update fails verification with this set, the update is dropped from the transaction but +// other successful reference updates will be made. By default, the entire transaction is aborted if a +// reference fails verification. +// +// The default behavior models `git push --atomic`. Toggling this option models the behavior without +// the `--atomic` flag. +func (txn *Transaction) SkipVerificationFailures() { + txn.skipVerificationFailures = true +} + +// UpdateReferences updates the given references as part of the transaction. If UpdateReferences is called +// multiple times, only the changes from the latest invocation take place. +func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) { + txn.referenceUpdates = updates +} + +// SetDefaultBranch sets the default branch as part of the transaction. If SetDefaultBranch is called +// multiple times, only the changes from the latest invocation take place. The reference is validated +// to exist. +func (txn *Transaction) SetDefaultBranch(new git.ReferenceName) { + txn.defaultBranchUpdate = &DefaultBranchUpdate{Reference: new} +} + +// SetCustomHooks sets the custom hooks as part of the transaction. If SetCustomHooks is called multiple +// times, only the changes from the latest invocation take place. The hooks are extracted as is and are +// not validated. Setting a nil hooksTAR removes the hooks from the repository. +func (txn *Transaction) SetCustomHooks(hooksTAR []byte) { + txn.customHooksUpdate = &CustomHooksUpdate{CustomHooksTAR: hooksTAR} } // TransactionManager is responsible for transaction management of a single repository. Each repository has @@ -179,7 +223,7 @@ type TransactionManager struct { db database // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. - admissionQueue chan transactionFuture + admissionQueue chan *Transaction // appendedLogIndex holds the index of the last log entry appended to the log. appendedLogIndex LogIndex @@ -205,7 +249,7 @@ func NewTransactionManager(db *badger.DB, repository *localrepo.Repo) *Transacti stop: cancel, repository: repository, db: newDatabaseAdapter(db), - admissionQueue: make(chan transactionFuture), + admissionQueue: make(chan *Transaction), } } @@ -213,31 +257,14 @@ func NewTransactionManager(db *badger.DB, repository *localrepo.Repo) *Transacti // outcome has been decided. type resultChannel chan error -// transactionFuture holds a transaction and the resultChannel the transaction queuer is waiting -// for the result on. -type transactionFuture struct { - transaction Transaction - result resultChannel -} - -// Propose queues a transaction for the TransactionManager to process. Propose returns once the transaction -// has been successfully applied to the repository. -// -// Transaction is not committed if any of the following errors is returned: -// - InvalidReferenceFormatError is returned when attempting to update a reference with an invalid name. -// - ReferenceVerificationError is returned when the reference does not point to the expected old tip. -// -// Transaction may or may not be committed if any of the following errors is returned: -// - ErrTransactionProcessing stopped is returned when the TransactionManager is closing and stops processing -// transactions. -// - Unexpected error occurs. -func (mgr *TransactionManager) Propose(ctx context.Context, transaction Transaction) error { - queuedTransaction := transactionFuture{transaction: transaction, result: make(resultChannel, 1)} +// commit queus the transaction for processing and returns once the result has been determined. +func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error { + transaction.result = make(resultChannel, 1) select { - case mgr.admissionQueue <- queuedTransaction: + case mgr.admissionQueue <- transaction: select { - case err := <-queuedTransaction.result: + case err := <-transaction.result: return unwrapExpectedError(err) case <-ctx.Done(): return ctx.Err() @@ -268,10 +295,10 @@ func unwrapExpectedError(err error) error { // references, logging the transaction and finally applying it to the repository. The transactions are acknowledged // once they've been applied to the repository. // -// Run keeps running until Stop is called or it encounters a fatal error. All active Propose calls return +// Run keeps running until Stop is called or it encounters a fatal error. All transactions will error with // ErrTransactionProcessingStopped when Run returns. func (mgr *TransactionManager) Run() error { - // Defer the Stop in order to release all on-going Propose calls in case of error. + // Defer the Stop in order to release all on-going Commit calls in case of error. defer close(mgr.runDone) defer mgr.Stop() @@ -302,7 +329,7 @@ func (mgr *TransactionManager) Run() error { continue } - var transaction transactionFuture + var transaction *Transaction select { case transaction = <-mgr.admissionQueue: case <-mgr.ctx.Done(): @@ -315,14 +342,14 @@ func (mgr *TransactionManager) Run() error { } if err := func() error { - logEntry, err := mgr.verifyReferences(mgr.ctx, transaction.transaction) + logEntry, err := mgr.verifyReferences(mgr.ctx, transaction) if err != nil { return fmt.Errorf("verify references: %w", err) } - if transaction.transaction.CustomHooksUpdate != nil { + if transaction.customHooksUpdate != nil { logEntry.CustomHooksUpdate = &gitalypb.LogEntry_CustomHooksUpdate{ - CustomHooksTar: transaction.transaction.CustomHooksUpdate.CustomHooksTAR, + CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR, } } @@ -377,9 +404,9 @@ func (mgr *TransactionManager) initialize() error { // verifyReferences verifies that the references in the transaction apply on top of the already accepted // reference changes. The old tips in the transaction are verified against the current actual tips. // It returns the write-ahead log entry for the transaction if it was successfully verified. -func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction Transaction) (*gitalypb.LogEntry, error) { +func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction *Transaction) (*gitalypb.LogEntry, error) { logEntry := &gitalypb.LogEntry{} - for referenceName, tips := range transaction.ReferenceUpdates { + for referenceName, tips := range transaction.referenceUpdates { // 'git update-ref' doesn't ensure the loose references end up in the // refs directory so we enforce that here. if !strings.HasPrefix(referenceName.String(), "refs/") { @@ -408,7 +435,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction } if tips.OldOID != actualOldTip { - if transaction.SkipVerificationFailures { + if transaction.skipVerificationFailures { continue } @@ -437,13 +464,13 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction return nil, fmt.Errorf("verify references with git: %w", err) } - if transaction.DefaultBranchUpdate != nil { + if transaction.defaultBranchUpdate != nil { if err := mgr.verifyDefaultBranchUpdate(ctx, transaction); err != nil { return nil, fmt.Errorf("verify default branch update: %w", err) } logEntry.DefaultBranchUpdate = &gitalypb.LogEntry_DefaultBranchUpdate{ - ReferenceName: []byte(transaction.DefaultBranchUpdate.Reference), + ReferenceName: []byte(transaction.defaultBranchUpdate.Reference), } } @@ -467,12 +494,12 @@ func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, refe // the references in the current transaction which is not scheduled to be deleted. If not, we check if its a valid reference // name in the repository. We don't do reference name validation because any reference going through the transaction manager // has name validation and we can rely on that. -func (mgr *TransactionManager) verifyDefaultBranchUpdate(ctx context.Context, transaction Transaction) error { - referenceName := transaction.DefaultBranchUpdate.Reference +func (mgr *TransactionManager) verifyDefaultBranchUpdate(ctx context.Context, transaction *Transaction) error { + referenceName := transaction.defaultBranchUpdate.Reference // Check the transaction reference updates, to see if the refname exists, if we find it here // we don't have to invoke git to do a refname check. - if refUpdate, ok := transaction.ReferenceUpdates[referenceName]; ok { + if refUpdate, ok := transaction.referenceUpdates[referenceName]; ok { objectHash, err := mgr.repository.ObjectHash(ctx) if err != nil { return fmt.Errorf("obtaining object hash: %w", err) diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 85f225ad1..9961fff83 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -134,17 +134,23 @@ func TestTransactionManager(t *testing.T) { RestartManager bool // SkipStartManager can be used to skip starting the manager at the start of the step. SkipStartManager bool - // Context is the context to use for the Propose call of the step. - Context context.Context - // Transaction is the transaction that is proposed in this step. - Transaction Transaction + // CommitContext is the context to use for the Commit call of the step. + CommitContext context.Context + // SkipVerificationFailures sets the verification failure handling for this step. + SkipVerificationFailures bool + // ReferenceUpdates are the reference updates to perform in this step. + ReferenceUpdates ReferenceUpdates + // DefaultBranchUpdate is the default branch update to perform in this step. + DefaultBranchUpdate *DefaultBranchUpdate + // CustomHooksUpdate is the custom hooks update to perform in this step. + CustomHooksUpdate *CustomHooksUpdate // Hooks contains the hook functions that are configured on the TransactionManager. These allow // for better synchronization. Hooks testHooks // ExpectedRunError is the expected error to be returned from Run from this step. ExpectedRunError bool - // ExpectedProposeError is the error that is expected to be returned when proposing the transaction in this step. - ExpectedProposeError error + // ExpectedCommitError is the error that is expected to be returned when committing the transaction in this step. + ExpectedCommitError error // ExpectedHooks is the expected state of the hooks at the end of this step. ExpectedHooks testhelper.DirectoryState // ExpectedReferences is the expected state of references at the end of this step. @@ -168,14 +174,12 @@ func TestTransactionManager(t *testing.T) { desc: "invalid reference aborts the entire transaction", steps: steps{ { - Transaction: Transaction{ - SkipVerificationFailures: true, - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: updateref.ErrInvalidReferenceFormat{ReferenceName: "refs/heads/../main"}, + ExpectedCommitError: updateref.ErrInvalidReferenceFormat{ReferenceName: "refs/heads/../main"}, ExpectedDefaultBranch: "", }, }, @@ -184,19 +188,15 @@ func TestTransactionManager(t *testing.T) { desc: "continues processing after aborting due to an invalid reference", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: updateref.ErrInvalidReferenceFormat{ReferenceName: "refs/heads/../main"}, + ExpectedCommitError: updateref.ErrInvalidReferenceFormat{ReferenceName: "refs/heads/../main"}, ExpectedDefaultBranch: "", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -225,10 +225,8 @@ func TestTransactionManager(t *testing.T) { desc: "create reference", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -257,10 +255,8 @@ func TestTransactionManager(t *testing.T) { desc: "create a file-directory reference conflict different transaction", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/parent", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -284,12 +280,10 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/parent", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: updateref.ErrFileDirectoryConflict{ + ExpectedCommitError: updateref.ErrFileDirectoryConflict{ ExistingReferenceName: "refs/heads/parent", ConflictingReferenceName: "refs/heads/parent/child", }, @@ -305,13 +299,11 @@ func TestTransactionManager(t *testing.T) { desc: "create a file-directory reference conflict in same transaction", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: updateref.ErrInTransactionConflict{ + ExpectedCommitError: updateref.ErrInTransactionConflict{ FirstReferenceName: "refs/heads/parent", SecondReferenceName: "refs/heads/parent/child", }, @@ -323,15 +315,13 @@ func TestTransactionManager(t *testing.T) { desc: "file-directory conflict aborts the transaction with verification failures skipped", steps: steps{ { - Transaction: Transaction{ - SkipVerificationFailures: true, - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: updateref.ErrInTransactionConflict{ + ExpectedCommitError: updateref.ErrInTransactionConflict{ FirstReferenceName: "refs/heads/parent", SecondReferenceName: "refs/heads/parent/child", }, @@ -343,10 +333,8 @@ func TestTransactionManager(t *testing.T) { desc: "delete file-directory conflict in different transaction", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/parent/child", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -370,12 +358,10 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/parent/child", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, }, - ExpectedProposeError: updateref.ErrFileDirectoryConflict{ + ExpectedCommitError: updateref.ErrFileDirectoryConflict{ ExistingReferenceName: "refs/heads/parent/child", ConflictingReferenceName: "refs/heads/parent", }, @@ -391,13 +377,11 @@ func TestTransactionManager(t *testing.T) { desc: "delete file-directory conflict in same transaction", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, }, - ExpectedProposeError: updateref.ErrInTransactionConflict{ + ExpectedCommitError: updateref.ErrInTransactionConflict{ FirstReferenceName: "refs/heads/parent", SecondReferenceName: "refs/heads/parent/child", }, @@ -409,15 +393,13 @@ func TestTransactionManager(t *testing.T) { desc: "create a branch to a non-commit object", steps: steps{ { - Transaction: Transaction{ - SkipVerificationFailures: true, - ReferenceUpdates: ReferenceUpdates{ - // The error should abort the entire transaction. - "refs/heads/branch-1": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/branch-2": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, - }, + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + // The error should abort the entire transaction. + "refs/heads/branch-1": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/branch-2": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, }, - ExpectedProposeError: updateref.NonCommitObjectError{ + ExpectedCommitError: updateref.NonCommitObjectError{ ReferenceName: "refs/heads/branch-2", ObjectID: objectHash.EmptyTreeOID.String(), }, @@ -429,10 +411,8 @@ func TestTransactionManager(t *testing.T) { desc: "create a tag to a non-commit object", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/tags/v1.0.0": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/tags/v1.0.0": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/tags/v1.0.0", Target: objectHash.EmptyTreeOID.String()}}, Hooks: testHooks{ @@ -461,12 +441,10 @@ func TestTransactionManager(t *testing.T) { desc: "create a reference to non-existent object", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: nonExistentOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: nonExistentOID}, }, - ExpectedProposeError: updateref.NonExistentObjectError{ + ExpectedCommitError: updateref.NonExistentObjectError{ ReferenceName: "refs/heads/main", ObjectID: nonExistentOID.String(), }, @@ -478,10 +456,8 @@ func TestTransactionManager(t *testing.T) { desc: "create reference ignoring verification failure", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -505,12 +481,10 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - SkipVerificationFailures: true, - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, - "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, - }, + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -542,10 +516,8 @@ func TestTransactionManager(t *testing.T) { desc: "create reference that already exists", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -569,13 +541,11 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, - "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: objectHash.ZeroOID, ActualOID: rootCommitOID, @@ -599,10 +569,8 @@ func TestTransactionManager(t *testing.T) { desc: "create reference no-op", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -626,12 +594,10 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: objectHash.ZeroOID, ActualOID: rootCommitOID, @@ -648,10 +614,8 @@ func TestTransactionManager(t *testing.T) { desc: "update reference", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -675,10 +639,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, Hooks: testHooks{ @@ -708,11 +670,9 @@ func TestTransactionManager(t *testing.T) { desc: "update reference ignoring verification failures", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -744,12 +704,10 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - SkipVerificationFailures: true, - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, - "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, - }, + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -781,11 +739,9 @@ func TestTransactionManager(t *testing.T) { desc: "update reference with incorrect old tip", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -816,13 +772,11 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, - "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: secondCommitOID, ActualOID: rootCommitOID, @@ -842,12 +796,10 @@ func TestTransactionManager(t *testing.T) { desc: "update non-existent reference", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: secondCommitOID, ActualOID: objectHash.ZeroOID, @@ -860,10 +812,8 @@ func TestTransactionManager(t *testing.T) { desc: "update reference no-op", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -887,10 +837,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -919,10 +867,8 @@ func TestTransactionManager(t *testing.T) { desc: "delete reference", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -946,10 +892,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, }, Hooks: testHooks{ BeforeDeleteLogEntry: func(hookCtx hookContext) { @@ -977,11 +921,9 @@ func TestTransactionManager(t *testing.T) { desc: "delete reference ignoring verification failures", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -1012,12 +954,10 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - SkipVerificationFailures: true, - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, - "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, - }, + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -1046,11 +986,9 @@ func TestTransactionManager(t *testing.T) { desc: "delete reference with incorrect old tip", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -1081,13 +1019,11 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, - "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: secondCommitOID, ActualOID: rootCommitOID, @@ -1107,12 +1043,10 @@ func TestTransactionManager(t *testing.T) { desc: "delete non-existent reference", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: rootCommitOID, ActualOID: objectHash.ZeroOID, @@ -1125,10 +1059,8 @@ func TestTransactionManager(t *testing.T) { desc: "delete reference no-op", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, }, Hooks: testHooks{ BeforeDeleteLogEntry: func(hookCtx hookContext) { @@ -1156,10 +1088,8 @@ func TestTransactionManager(t *testing.T) { desc: "set custom hooks successfully", steps: steps{ { - Transaction: Transaction{ - CustomHooksUpdate: &CustomHooksUpdate{ - CustomHooksTAR: validCustomHooks(t), - }, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), }, Hooks: testHooks{ BeforeDeleteLogEntry: func(hookCtx hookContext) { @@ -1188,9 +1118,7 @@ func TestTransactionManager(t *testing.T) { }, }, { - Transaction: Transaction{ - CustomHooksUpdate: &CustomHooksUpdate{}, - }, + CustomHooksUpdate: &CustomHooksUpdate{}, Hooks: testHooks{ BeforeDeleteLogEntry: func(hookCtx hookContext) { RequireDatabase(hookCtx.tb, ctx, hookCtx.database, DatabaseState{ @@ -1222,10 +1150,8 @@ func TestTransactionManager(t *testing.T) { desc: "reapplying custom hooks works", steps: steps{ { - Transaction: Transaction{ - CustomHooksUpdate: &CustomHooksUpdate{ - CustomHooksTAR: validCustomHooks(t), - }, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), }, Hooks: testHooks{ BeforeStoreAppliedLogIndex: func(hookContext) { @@ -1240,8 +1166,8 @@ func TestTransactionManager(t *testing.T) { }, }, }, - ExpectedRunError: true, - ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedRunError: true, + ExpectedCommitError: ErrTransactionProcessingStopped, ExpectedHooks: testhelper.DirectoryState{ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, @@ -1256,7 +1182,6 @@ func TestTransactionManager(t *testing.T) { { // Empty transaction used here to just check the log entry is applied and the // applied index incremented. - Transaction: Transaction{}, ExpectedDatabase: DatabaseState{ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), }, @@ -1277,12 +1202,10 @@ func TestTransactionManager(t *testing.T) { desc: "continues processing after reference verification failure", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: rootCommitOID, ActualOID: objectHash.ZeroOID, @@ -1290,10 +1213,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, Hooks: testHooks{ @@ -1322,10 +1243,8 @@ func TestTransactionManager(t *testing.T) { desc: "continues processing after a restart", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, Hooks: testHooks{ @@ -1351,10 +1270,8 @@ func TestTransactionManager(t *testing.T) { { StopManager: true, RestartManager: true, - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, Hooks: testHooks{ @@ -1383,12 +1300,10 @@ func TestTransactionManager(t *testing.T) { desc: "continues processing after restarting after a reference verification failure", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: rootCommitOID, ActualOID: objectHash.ZeroOID, @@ -1398,10 +1313,8 @@ func TestTransactionManager(t *testing.T) { { StopManager: true, RestartManager: true, - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, Hooks: testHooks{ @@ -1430,19 +1343,17 @@ func TestTransactionManager(t *testing.T) { desc: "continues processing after failing to store log index", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, Hooks: testHooks{ BeforeStoreAppliedLogIndex: func(hookCtx hookContext) { panic("node failure simulation") }, }, - ExpectedPanic: "node failure simulation", - ExpectedProposeError: ErrTransactionProcessingStopped, - ExpectedRunError: true, + ExpectedPanic: "node failure simulation", + ExpectedCommitError: ErrTransactionProcessingStopped, + ExpectedRunError: true, ExpectedDatabase: DatabaseState{ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ @@ -1457,10 +1368,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, ExpectedDatabase: DatabaseState{ @@ -1474,18 +1383,16 @@ func TestTransactionManager(t *testing.T) { desc: "recovers from the write-ahead log on start up", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, Hooks: testHooks{ BeforeApplyLogEntry: func(hookCtx hookContext) { hookCtx.stopManager() }, }, - ExpectedProposeError: ErrTransactionProcessingStopped, - ExpectedRunError: true, + ExpectedCommitError: ErrTransactionProcessingStopped, + ExpectedRunError: true, ExpectedDatabase: DatabaseState{ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ @@ -1499,10 +1406,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, // Notice that here we can't check state of database before deletion because @@ -1520,18 +1425,16 @@ func TestTransactionManager(t *testing.T) { desc: "reference verification fails after recovering logged writes", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, Hooks: testHooks{ BeforeApplyLogEntry: func(hookCtx hookContext) { hookCtx.stopManager() }, }, - ExpectedProposeError: ErrTransactionProcessingStopped, - ExpectedRunError: true, + ExpectedCommitError: ErrTransactionProcessingStopped, + ExpectedRunError: true, ExpectedDatabase: DatabaseState{ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ @@ -1545,12 +1448,10 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: secondCommitOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: ReferenceVerificationError{ + ExpectedCommitError: ReferenceVerificationError{ ReferenceName: "refs/heads/main", ExpectedOID: secondCommitOID, ActualOID: rootCommitOID, @@ -1579,36 +1480,32 @@ func TestTransactionManager(t *testing.T) { }, }, { - desc: "propose returns if context is canceled before admission", + desc: "commit returns if context is canceled before admission", steps: steps{ { - Context: func() context.Context { + CommitContext: func() context.Context { ctx, cancel := context.WithCancel(ctx) cancel() return ctx }(), SkipStartManager: true, - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: context.Canceled, + ExpectedCommitError: context.Canceled, ExpectedDefaultBranch: "", }, }, }, { - desc: "propose returns if transaction processing stops before admission", + desc: "commit returns if transaction processing stops before admission", steps: steps{ { StopManager: true, - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedCommitError: ErrTransactionProcessingStopped, ExpectedDefaultBranch: "", }, }, @@ -1616,18 +1513,16 @@ func TestTransactionManager(t *testing.T) { func() testCase { ctx, cancel := context.WithCancel(ctx) return testCase{ - desc: "propose returns if context is canceled after admission", + desc: "commit returns if context is canceled after admission", steps: steps{ { - Context: ctx, - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + CommitContext: ctx, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, Hooks: testHooks{ BeforeApplyLogEntry: func(hookCtx hookContext) { - // Cancel the context used in Propose + // Cancel the context used in Commit cancel() }, BeforeDeleteLogEntry: func(hookCtx hookContext) { @@ -1644,8 +1539,8 @@ func TestTransactionManager(t *testing.T) { }) }, }, - ExpectedProposeError: context.Canceled, - ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedCommitError: context.Canceled, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, ExpectedDatabase: DatabaseState{ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), }, @@ -1655,13 +1550,11 @@ func TestTransactionManager(t *testing.T) { } }(), { - desc: "propose returns if transaction processing stops before transaction acceptance", + desc: "commit returns if transaction processing stops before transaction acceptance", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, Hooks: testHooks{ BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.stopManager() }, @@ -1670,26 +1563,24 @@ func TestTransactionManager(t *testing.T) { // runDone is closed. WaitForTransactionsWhenStopping: true, }, - ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedCommitError: ErrTransactionProcessingStopped, }, }, }, { - desc: "propose returns if transaction processing stops after transaction acceptance", + desc: "commit returns if transaction processing stops after transaction acceptance", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, Hooks: testHooks{ BeforeApplyLogEntry: func(hookCtx hookContext) { hookCtx.stopManager() }, }, - ExpectedProposeError: ErrTransactionProcessingStopped, - ExpectedRunError: true, + ExpectedCommitError: ErrTransactionProcessingStopped, + ExpectedRunError: true, ExpectedDatabase: DatabaseState{ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ @@ -1707,11 +1598,9 @@ func TestTransactionManager(t *testing.T) { desc: "update default branch with existing branch", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/branch2", Target: rootCommitOID.String()}, @@ -1742,10 +1631,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - DefaultBranchUpdate: &DefaultBranchUpdate{ - Reference: "refs/heads/branch2", - }, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch2", }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/branch2", Target: rootCommitOID.String()}, @@ -1774,10 +1661,8 @@ func TestTransactionManager(t *testing.T) { desc: "update default branch with new branch created in same transaction", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -1803,14 +1688,12 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, - DefaultBranchUpdate: &DefaultBranchUpdate{ - Reference: "refs/heads/branch2", - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch2", }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/branch2", Target: rootCommitOID.String()}, @@ -1849,18 +1732,16 @@ func TestTransactionManager(t *testing.T) { desc: "update default branch with invalid reference name", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, - DefaultBranchUpdate: &DefaultBranchUpdate{ - Reference: "refs/heads/../main", - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/../main", }, // For branch updates, we don't really verify the refname schematics, we take a shortcut // and rely on it being either a verified new reference name or a reference name which // exists on the repo already. - ExpectedProposeError: git.ErrReferenceNotFound, + ExpectedCommitError: git.ErrReferenceNotFound, ExpectedDefaultBranch: "", }, }, @@ -1869,18 +1750,16 @@ func TestTransactionManager(t *testing.T) { desc: "update default branch to point to a non-existent reference name", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, - DefaultBranchUpdate: &DefaultBranchUpdate{ - Reference: "refs/heads/yoda", - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/yoda", }, // For branch updates, we don't really verify the refname schematics, we take a shortcut // and rely on it being either a verified new reference name or a reference name which // exists on the repo already. - ExpectedProposeError: git.ErrReferenceNotFound, + ExpectedCommitError: git.ErrReferenceNotFound, ExpectedDefaultBranch: "", }, }, @@ -1889,11 +1768,9 @@ func TestTransactionManager(t *testing.T) { desc: "update default branch to point to reference being deleted in the same transaction", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/branch2", Target: rootCommitOID.String()}, @@ -1924,15 +1801,13 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/branch2": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, - }, - DefaultBranchUpdate: &DefaultBranchUpdate{ - Reference: "refs/heads/branch2", - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch2": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch2", }, - ExpectedProposeError: ReferenceToBeDeletedError{ReferenceName: "refs/heads/branch2"}, + ExpectedCommitError: ReferenceToBeDeletedError{ReferenceName: "refs/heads/branch2"}, ExpectedReferences: []git.Reference{ {Name: "refs/heads/branch2", Target: rootCommitOID.String()}, {Name: "refs/heads/main", Target: rootCommitOID.String()}, @@ -1948,11 +1823,9 @@ func TestTransactionManager(t *testing.T) { desc: "update default branch with existing branch and other modifications", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/branch2", Target: rootCommitOID.String()}, @@ -1983,13 +1856,11 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/main", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, - DefaultBranchUpdate: &DefaultBranchUpdate{ - Reference: "refs/heads/branch2", - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch2", }, ExpectedReferences: []git.Reference{ {Name: "refs/heads/branch2", Target: rootCommitOID.String()}, @@ -2024,23 +1895,21 @@ func TestTransactionManager(t *testing.T) { desc: "update default branch fails before storing log index", steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, - DefaultBranchUpdate: &DefaultBranchUpdate{ - Reference: "refs/heads/branch2", - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/branch2": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch2", }, Hooks: testHooks{ BeforeStoreAppliedLogIndex: func(hookCtx hookContext) { panic("node failure simulation") }, }, - ExpectedPanic: "node failure simulation", - ExpectedProposeError: ErrTransactionProcessingStopped, - ExpectedRunError: true, + ExpectedPanic: "node failure simulation", + ExpectedCommitError: ErrTransactionProcessingStopped, + ExpectedRunError: true, ExpectedDatabase: DatabaseState{ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ @@ -2065,10 +1934,8 @@ func TestTransactionManager(t *testing.T) { ExpectedDefaultBranch: "refs/heads/branch2", }, { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, }, ExpectedReferences: []git.Reference{ @@ -2107,12 +1974,10 @@ func TestTransactionManager(t *testing.T) { desc: fmt.Sprintf("invalid reference %s", tc.desc), steps: steps{ { - Transaction: Transaction{ - ReferenceUpdates: ReferenceUpdates{ - tc.referenceName: {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, - }, + ReferenceUpdates: ReferenceUpdates{ + tc.referenceName: {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, - ExpectedProposeError: err, + ExpectedCommitError: err, }, }, }) @@ -2220,8 +2085,8 @@ func TestTransactionManager(t *testing.T) { // managerErr is used for synchronizing manager stopping and returning // the error from Run. managerErr chan error - // inflightTransactions tracks the number of on going propose calls. It is used to synchronize - // the database hooks with propose calls. + // inflightTransactions tracks the number of on going transactions calls. It is used to synchronize + // the database hooks with transactions. inflightTransactions sync.WaitGroup ) @@ -2306,12 +2171,32 @@ func TestTransactionManager(t *testing.T) { inflightTransactions.Add(1) defer inflightTransactions.Done() - proposeCtx := ctx - if step.Context != nil { - proposeCtx = step.Context + transaction, err := transactionManager.Begin(ctx) + require.NoError(t, err) + defer transaction.Rollback() + + if step.SkipVerificationFailures { + transaction.SkipVerificationFailures() + } + + if step.ReferenceUpdates != nil { + transaction.UpdateReferences(step.ReferenceUpdates) + } + + if step.DefaultBranchUpdate != nil { + transaction.SetDefaultBranch(step.DefaultBranchUpdate.Reference) } - require.ErrorIs(t, transactionManager.Propose(proposeCtx, step.Transaction), step.ExpectedProposeError) + if step.CustomHooksUpdate != nil { + transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR) + } + + commitCtx := ctx + if step.CommitContext != nil { + commitCtx = step.CommitContext + } + + require.ErrorIs(t, transaction.Commit(commitCtx), step.ExpectedCommitError) }() if !step.SkipStartManager { @@ -2334,9 +2219,9 @@ func TestTransactionManager(t *testing.T) { func checkManagerError(t *testing.T, managerErrChannel chan error, mgr *TransactionManager) (bool, error) { t.Helper() - testTransaction := transactionFuture{ - transaction: Transaction{ReferenceUpdates: ReferenceUpdates{"sentinel": {}}}, - result: make(resultChannel, 1), + testTransaction := &Transaction{ + referenceUpdates: ReferenceUpdates{"sentinel": {}}, + result: make(chan error, 1), } var ( @@ -2353,7 +2238,7 @@ func checkManagerError(t *testing.T, managerErrChannel chan error, mgr *Transact case mgr.admissionQueue <- testTransaction: // If the error channel doesn't receive, we don't know whether it is because the manager is still running // or we are still waiting for it to return. We test whether the manager is running or not here by queueing a - // a proposal that will error. If the manager processes it, we know it is still running. + // a transaction that will error. If the manager processes it, we know it is still running. // // If the manager was stopped, it might manage to admit the testTransaction but not process it. To determine // whether that was the case, we also keep waiting on the managerErr channel. @@ -2381,10 +2266,10 @@ func BenchmarkTransactionManager(b *testing.B) { // setting this higher allows for testing parallel throughput of multiple repositories. This mostly serves // to determine the impact of the shared resources such as the database. numberOfRepositories int - // concurrentUpdaters sets the number of goroutines that are calling Propose for a repository. Each of the - // updaters work on their own references so they don't block each other. Setting this to 1 allows for testing - // sequential update throughput of a repository. Setting this higher allows for testing reference update - // throughput when multiple references are being updated concurrently. + // concurrentUpdaters sets the number of goroutines that are calling committing transactions for a repository. + // Each of the updaters work on their own references so they don't block each other. Setting this to 1 allows + // for testing sequential update throughput of a repository. Setting this higher allows for testing reference + // update throughput when multiple references are being updated concurrently. concurrentUpdaters int // transactionSize sets the number of references that are updated in each transaction. transactionSize int @@ -2492,14 +2377,15 @@ func BenchmarkTransactionManager(b *testing.B) { require.NoError(b, err) for j := 0; j < tc.concurrentUpdaters; j++ { - require.NoError(b, manager.Propose(ctx, Transaction{ - ReferenceUpdates: getReferenceUpdates(j, objectHash.ZeroOID, commit1), - })) + transaction, err := manager.Begin(ctx) + require.NoError(b, err) + transaction.UpdateReferences(getReferenceUpdates(j, objectHash.ZeroOID, commit1)) + require.NoError(b, transaction.Commit(ctx)) } } - // proposeWG tracks the number of on going Propose calls. - var proposeWG sync.WaitGroup + // transactionWG tracks the number of on going transaction. + var transactionWG sync.WaitGroup transactionChan := make(chan struct{}) for _, manager := range managers { @@ -2510,17 +2396,22 @@ func BenchmarkTransactionManager(b *testing.B) { currentReferences := getReferenceUpdates(i, commit1, commit2) nextReferences := getReferenceUpdates(i, commit2, commit1) + transaction, err := manager.Begin(ctx) + require.NoError(b, err) + transaction.UpdateReferences(currentReferences) + // Setup the starting state so the references start at the expected old tip. - require.NoError(b, manager.Propose(ctx, Transaction{ - ReferenceUpdates: currentReferences, - })) + require.NoError(b, transaction.Commit(ctx)) - proposeWG.Add(1) + transactionWG.Add(1) go func() { - defer proposeWG.Done() + defer transactionWG.Done() for range transactionChan { - assert.NoError(b, manager.Propose(ctx, Transaction{ReferenceUpdates: nextReferences})) + transaction, err := manager.Begin(ctx) + require.NoError(b, err) + transaction.UpdateReferences(nextReferences) + assert.NoError(b, transaction.Commit(ctx)) currentReferences, nextReferences = nextReferences, currentReferences } }() @@ -2536,7 +2427,7 @@ func BenchmarkTransactionManager(b *testing.B) { } close(transactionChan) - proposeWG.Wait() + transactionWG.Wait() b.StopTimer() b.ReportMetric(float64(b.N*tc.transactionSize)/time.Since(began).Seconds(), "reference_updates/s") |