Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-01-17 13:48:18 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-01-17 13:48:18 +0300
commit7e80ece2ebfd82776285c83362e6874dacb5b26a (patch)
tree9743724ec61b44c8e0c768eaeea90603f3ab4ae2 /internal
parent60e00c9b19ab28605c0b80931683feddcc62d971 (diff)
parent8a29ef51192cc57770dcd7966d9af8173eb7ff4d (diff)
Merge branch 'smh-log-worker' into 'master'
Implement basic transaction management with write-ahead logging See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5020 Merged-by: Sami Hiltunen <shiltunen@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Diffstat (limited to 'internal')
-rw-r--r--internal/gitaly/database.go59
-rw-r--r--internal/gitaly/testhelper_test.go72
-rw-r--r--internal/gitaly/transaction_manager.go539
-rw-r--r--internal/gitaly/transaction_manager_hook_test.go137
-rw-r--r--internal/gitaly/transaction_manager_test.go1586
-rw-r--r--internal/testhelper/leakage.go4
6 files changed, 2397 insertions, 0 deletions
diff --git a/internal/gitaly/database.go b/internal/gitaly/database.go
new file mode 100644
index 000000000..2015de710
--- /dev/null
+++ b/internal/gitaly/database.go
@@ -0,0 +1,59 @@
+package gitaly
+
+import (
+ "github.com/dgraph-io/badger/v3"
+)
+
+// OpenDatabase opens a new database handle to a database at the given path.
+func OpenDatabase(databasePath string) (*badger.DB, error) {
+ dbOptions := badger.DefaultOptions(databasePath)
+ // Enable SyncWrites to ensure all writes are persisted to disk before considering
+ // them committed.
+ dbOptions.SyncWrites = true
+ // Badger by default logs fairly verbose statistics when opening a database. Disable the
+ // logging for now.
+ dbOptions.Logger = nil
+
+ return badger.Open(dbOptions)
+}
+
+// databaseAdapter adapts a *badger.DB to the internal database interface used by the hooks in tests.
+type databaseAdapter struct{ *badger.DB }
+
+// newDatabaseAdapter adapts a *badger.DB to conform to the internal database interface used for
+// hooking into during testing.
+func newDatabaseAdapter(db *badger.DB) database {
+ return databaseAdapter{DB: db}
+}
+
+// NewWriteBatch calls badger.*DB.NewWriteBatch. Refer to Badger's documentation for details.
+func (db databaseAdapter) NewWriteBatch() writeBatch {
+ return db.DB.NewWriteBatch()
+}
+
+// NewWriteBatch calls badger.*DB.View. Refer to Badger's documentation for details.
+func (db databaseAdapter) View(handler func(databaseTransaction) error) error {
+ return db.DB.View(func(txn *badger.Txn) error { return handler(txn) })
+}
+
+// database is the Badger.DB interface used by TransactionManager. Refer to Badger's documentation
+// for details.
+type database interface {
+ NewWriteBatch() writeBatch
+ View(func(databaseTransaction) error) error
+}
+
+// writeBatch is the interface of Badger.WriteBatch used by TransactionManager. Refer to Badger's
+// documentation for details.
+type writeBatch interface {
+ Set([]byte, []byte) error
+ Flush() error
+ Cancel()
+}
+
+// databaseTransaction is the interface of *Badger.Txn used by TransactionManager. Refer to Badger's
+// documentation for details
+type databaseTransaction interface {
+ Get([]byte) (*badger.Item, error)
+ NewIterator(badger.IteratorOptions) *badger.Iterator
+}
diff --git a/internal/gitaly/testhelper_test.go b/internal/gitaly/testhelper_test.go
new file mode 100644
index 000000000..9f194318d
--- /dev/null
+++ b/internal/gitaly/testhelper_test.go
@@ -0,0 +1,72 @@
+package gitaly
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "testing"
+
+ "github.com/dgraph-io/badger/v3"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "google.golang.org/protobuf/proto"
+)
+
+func TestMain(m *testing.M) {
+ testhelper.Run(m)
+}
+
+// DatabaseState describes the expected state of the key-value store. The keys in the map are the expected keys
+// in the database and the values are the expected unmarshaled values.
+type DatabaseState map[string]proto.Message
+
+// RequireDatabase asserts the actual database state matches the expected database state. The actual values in the
+// database are unmarshaled to the same type the values have in the expected database state.
+func RequireDatabase(tb testing.TB, ctx context.Context, database *badger.DB, expectedState DatabaseState) {
+ tb.Helper()
+
+ if expectedState == nil {
+ expectedState = DatabaseState{}
+ }
+
+ actualState := DatabaseState{}
+ unexpectedKeys := []string{}
+ require.NoError(tb, database.View(func(txn *badger.Txn) error {
+ iterator := txn.NewIterator(badger.DefaultIteratorOptions)
+ defer iterator.Close()
+
+ for iterator.Rewind(); iterator.Valid(); iterator.Next() {
+ key := iterator.Item().Key()
+ expectedValue, ok := expectedState[string(key)]
+ if !ok {
+ // Print the keys out escaped as otherwise the non-printing characters are not visible in the assertion failure.
+ unexpectedKeys = append(unexpectedKeys, fmt.Sprintf("%q", key))
+ continue
+ }
+
+ require.NoError(tb, iterator.Item().Value(func(value []byte) error {
+ // Unmarshal the actual value to the same type as the expected value.
+ actualValue := reflect.New(reflect.TypeOf(expectedValue).Elem()).Interface().(proto.Message)
+ require.NoError(tb, proto.Unmarshal(value, actualValue))
+ actualState[string(key)] = actualValue
+ return nil
+ }))
+ }
+
+ return nil
+ }))
+
+ require.Empty(tb, unexpectedKeys, "database contains unexpected keys")
+ testhelper.ProtoEqual(tb, expectedState, actualState)
+}
+
+// RequireReferences asserts that the actual state of the references in the repository match the expected.
+func RequireReferences(tb testing.TB, ctx context.Context, repo *localrepo.Repo, expectedReferences []git.Reference) {
+ tb.Helper()
+
+ actualReferences, err := repo.GetReferences(ctx)
+ require.NoError(tb, err)
+ require.ElementsMatch(tb, expectedReferences, actualReferences)
+}
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go
new file mode 100644
index 000000000..c9c89bf81
--- /dev/null
+++ b/internal/gitaly/transaction_manager.go
@@ -0,0 +1,539 @@
+package gitaly
+
+import (
+ "bytes"
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "sort"
+ "strings"
+
+ "github.com/dgraph-io/badger/v3"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ "google.golang.org/protobuf/proto"
+)
+
+// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions.
+var ErrTransactionProcessingStopped = errors.New("transaction processing stopped")
+
+// InvalidReferenceFormatError is returned when a reference name was invalid.
+type InvalidReferenceFormatError struct {
+ // ReferenceName is the reference with invalid format.
+ ReferenceName git.ReferenceName
+}
+
+// Error returns the formatted error string.
+func (err InvalidReferenceFormatError) Error() string {
+ return fmt.Sprintf("invalid reference format: %q", err.ReferenceName)
+}
+
+// ReferenceVerificationError is returned when a reference's old OID did not match the expected.
+type ReferenceVerificationError struct {
+ // ReferenceName is the name of the reference that failed verification.
+ ReferenceName git.ReferenceName
+ // ExpectedOID is the OID the reference was expected to point to.
+ ExpectedOID git.ObjectID
+ // ActualOID is the OID the reference actually pointed to.
+ ActualOID git.ObjectID
+}
+
+// Error returns the formatted error string.
+func (err ReferenceVerificationError) Error() string {
+ return fmt.Sprintf("expected %q to point to %q but it pointed to %q", err.ReferenceName, err.ExpectedOID, err.ActualOID)
+}
+
+// LogIndex points to a specific position in a repository's write-ahead log.
+type LogIndex uint64
+
+// toProto returns the protobuf representation of LogIndex for serialization purposes.
+func (index LogIndex) toProto() *gitalypb.LogIndex {
+ return &gitalypb.LogIndex{LogIndex: uint64(index)}
+}
+
+// ReferenceUpdate describes the state of a reference's old and new tip in an update.
+type ReferenceUpdate struct {
+ // OldOID is the old OID the reference is expected to point to prior to updating it.
+ // If the reference does not point to the old value, the reference verification fails.
+ OldOID git.ObjectID
+ // NewOID is the new desired OID to point the reference to.
+ NewOID git.ObjectID
+}
+
+// ReferenceUpdates contains references to update. Reference name is used as the key and the value
+// is the expected old tip and the desired new tip.
+type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate
+
+// Transaction is a unit-of-work that contains reference changes to perform on the repository.
+type Transaction struct {
+ // SkipVerificationFailures causes references updates that failed the verification step to be
+ // dropped from the transaction. By default, any verification failures abort the entire transaction.
+ //
+ // The default behavior maps to the `--atomic` flag of `git push`. When this is set, the behavior matches
+ // what happens git's behavior without the flag.
+ SkipVerificationFailures bool
+ // ReferenceUpdates contains the reference updates to be performed.
+ ReferenceUpdates
+}
+
+// TransactionManager is responsible for transaction management of a single repository. Each repository has
+// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from
+// the admissionQueue. Each admitted write is processed in three steps:
+//
+// 1. The references being updated are verified by ensuring the expected old tips match what the references
+// actually point to prior to update. The entire transaction is by default aborted if a single reference
+// fails the verification step. The reference verification behavior can be controlled on a per-transaction
+// level by setting:
+// - The reference verification failures can be ignored instead of aborting the entire transaction.
+// If done, the references that failed verification are dropped from the transaction but the updates
+// that passed verification are still performed.
+// - The reference verification may also be skipped if the write is force updating references. If
+// done, the current state of the references is ignored and they are directly updated to point
+// to the new tips.
+// 2. The transaction is appended to the write-ahead log. Once the write has been logged, it is effectively
+// committed and will be applied to the repository even after restarting.
+// 3. The transaction is applied from the write-ahead log to the repository by actually performing the reference
+// changes.
+//
+// The goroutine that issued the transaction is waiting for the result while these steps are being performed. As
+// there is no transaction control for readers yet, the issuer is only notified of a successful write after the
+// write has been applied to the repository.
+//
+// TransactionManager recovers transactions after interruptions by applying the write-ahead logged transactions to
+// the repository on start up.
+//
+// TransactionManager maintains the write-ahead log in a key-value store. It maintains the following key spaces:
+// - `repository/<repository_id:string>/log/index/applied`
+// - This key stores the index of the log entry that has been applied to the repository. This allows for
+// determining how far a repository is in processing the log and which log entries need to be applied
+// after starting up. Repository starts from log index 0 if there are no log entries recorded to have
+// been applied.
+//
+// - `repository/<repository_id:string>/log/entry/<log_index:uint64>`
+// - These keys hold the actual write-ahead log entries. A repository's first log entry starts at index 1
+// and the log index keeps monotonically increasing from there on without gaps. The write-ahead log
+// entries are processed in ascending order.
+//
+// The values in the database are marshaled protocol buffer messages. Numbers in the keys are encoded as big
+// endian to preserve the sort order of the numbers also in lexicographical order.
+type TransactionManager struct {
+ // ctx is the context used for all operations.
+ ctx context.Context
+ // stop cancels ctx and stops the transaction processing.
+ stop context.CancelFunc
+
+ // stopCalled is closed when Stop is called. It unblock transactions that are waiting to be admitted.
+ stopCalled <-chan struct{}
+ // runDone is closed when Run returns. It unblocks transactions that are waiting for a result after
+ // being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly
+ // releases awaiters when the transactions processing is stopped.
+ runDone chan struct{}
+
+ // repository is the repository this TransactionManager is acting on.
+ repository repository
+ // db is the handle to the key-value store used for storing the write-ahead log related state.
+ db database
+ // admissionQueue is where the incoming writes are waiting to be admitted to the transaction
+ // manager.
+ admissionQueue chan transactionFuture
+
+ // appendedLogIndex holds the index of the last log entry appended to the log.
+ appendedLogIndex LogIndex
+ // appliedLogIndex holds the index of the last log entry applied to the repository
+ appliedLogIndex LogIndex
+}
+
+// repository is the localrepo interface used by TransactionManager.
+type repository interface {
+ git.RepositoryExecutor
+ ResolveRevision(context.Context, git.Revision) (git.ObjectID, error)
+}
+
+// NewTransactionManager returns a new TransactionManager for the given repository.
+func NewTransactionManager(db *badger.DB, repository *localrepo.Repo) *TransactionManager {
+ ctx, cancel := context.WithCancel(context.Background())
+ return &TransactionManager{
+ ctx: ctx,
+ stopCalled: ctx.Done(),
+ runDone: make(chan struct{}),
+ stop: cancel,
+ repository: repository,
+ db: newDatabaseAdapter(db),
+ admissionQueue: make(chan transactionFuture),
+ }
+}
+
+// resultChannel represents a future that will yield the result of a transaction once its
+// outcome has been decided.
+type resultChannel chan error
+
+// transactionFuture holds a transaction and the resultChannel the transaction queuer is waiting
+// for the result on.
+type transactionFuture struct {
+ transaction Transaction
+ result resultChannel
+}
+
+// Propose queues a transaction for the TransactionManager to process. Propose returns once the transaction
+// has been successfully applied to the repository.
+//
+// Transaction is not committed if any of the following errors is returned:
+// - InvalidReferenceFormatError is returned when attempting to update a reference with an invalid name.
+// - ReferenceVerificationError is returned when the reference does not point to the expected old tip.
+//
+// Transaction may or may not be committed if any of the following errors is returned:
+// - ErrTransactionProcessing stopped is returned when the TransactionManager is closing and stops processing
+// transactions.
+// - Unexpected error occurs.
+func (mgr *TransactionManager) Propose(ctx context.Context, transaction Transaction) error {
+ queuedTransaction := transactionFuture{transaction: transaction, result: make(resultChannel, 1)}
+
+ select {
+ case mgr.admissionQueue <- queuedTransaction:
+ select {
+ case err := <-queuedTransaction.result:
+ return unwrapExpectedError(err)
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-mgr.runDone:
+ return ErrTransactionProcessingStopped
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-mgr.stopCalled:
+ return ErrTransactionProcessingStopped
+ }
+}
+
+// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller.
+func unwrapExpectedError(err error) error {
+ // The manager controls its own execution context and it is canceled only when Stop is called.
+ // Any context.Canceled errors returned are thus from shutting down so we report that here.
+ if errors.Is(err, context.Canceled) {
+ return ErrTransactionProcessingStopped
+ }
+
+ return err
+}
+
+// Run starts the transaction processing. On start up Run loads the indexes of the last appended and applied
+// log entries from the database. It will then apply any transactions that have been logged but not applied
+// to the repository. Once the recovery is completed, Run starts processing new transactions by verifying the
+// references, logging the transaction and finally applying it to the repository. The transactions are acknowledged
+// once they've been applied to the repository.
+//
+// Run keeps running until Stop is called or it encounters a fatal error. All active Propose calls return
+// ErrTransactionProcessingStopped when Run returns.
+func (mgr *TransactionManager) Run() error {
+ // Defer the Stop in order to release all on-going Propose calls in case of error.
+ defer close(mgr.runDone)
+ defer mgr.Stop()
+
+ if err := mgr.initialize(); err != nil {
+ return fmt.Errorf("initialize: %w", err)
+ }
+
+ // awaitingTransactions contains transactions waiting for their log entry to be applied to
+ // the repository. It's keyed by the log index the transaction is waiting to be applied and the
+ // value is the resultChannel that is waiting the result.
+ awaitingTransactions := make(map[LogIndex]resultChannel)
+
+ for {
+ if mgr.appliedLogIndex < mgr.appendedLogIndex {
+ logIndex := mgr.appliedLogIndex + 1
+
+ if err := mgr.applyLogEntry(mgr.ctx, logIndex); err != nil {
+ return fmt.Errorf("apply log entry: %w", err)
+ }
+
+ // There is no awaiter for a transaction if the transaction manager is recovering
+ // transactions from the log after starting up.
+ if resultChan, ok := awaitingTransactions[logIndex]; ok {
+ resultChan <- nil
+ delete(awaitingTransactions, logIndex)
+ }
+
+ continue
+ }
+
+ var transaction transactionFuture
+ select {
+ case transaction = <-mgr.admissionQueue:
+ case <-mgr.ctx.Done():
+ }
+
+ // Return if the manager was stopped. The select is indeterministic so this guarantees
+ // the manager stops the processing even if there are transactions in the queue.
+ if mgr.ctx.Err() != nil {
+ return nil
+ }
+
+ if err := func() error {
+ logEntry, err := mgr.verifyReferences(mgr.ctx, transaction.transaction)
+ if err != nil {
+ return fmt.Errorf("verify references: %w", err)
+ }
+
+ return mgr.appendLogEntry(logEntry)
+ }(); err != nil {
+ transaction.result <- err
+ continue
+ }
+
+ awaitingTransactions[mgr.appendedLogIndex] = transaction.result
+ }
+}
+
+// Stop stops the transaction processing causing Run to return.
+func (mgr *TransactionManager) Stop() { mgr.stop() }
+
+// initialize initializes the TransactionManager by loading the applied log index and determining the
+// appended log index from the database.
+func (mgr *TransactionManager) initialize() error {
+ var appliedLogIndex gitalypb.LogIndex
+ if err := mgr.readKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
+ return fmt.Errorf("read applied log index: %w", err)
+ }
+
+ mgr.appliedLogIndex = LogIndex(appliedLogIndex.LogIndex)
+
+ // The index of the last appended log entry is determined from the indexes of the latest entry in the log and
+ // the latest applied log entry. If there is a log entry, it is the latest appended log entry. If there are no
+ // log entries, the latest log entry must have been applied to the repository and pruned away, meaning the index
+ // of the last appended log entry is the same as the index if the last applied log entry.
+ //
+ // As the log indexes in the keys are encoded in big endian, the latest log entry can be found by taking
+ // the first key when iterating the log entry key space in reverse.
+ return mgr.db.View(func(txn databaseTransaction) error {
+ logPrefix := keyPrefixLogEntries(getRepositoryID(mgr.repository))
+
+ iterator := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: logPrefix})
+ defer iterator.Close()
+
+ mgr.appendedLogIndex = mgr.appliedLogIndex
+
+ // The iterator seeks to a key that is greater than or equal than seeked key. Since we are doing a reverse
+ // seek, we need to add 0xff to the prefix so the first iterated key is the latest log entry.
+ if iterator.Seek(append(logPrefix, 0xff)); iterator.Valid() {
+ mgr.appendedLogIndex = LogIndex(binary.BigEndian.Uint64(bytes.TrimPrefix(iterator.Item().Key(), logPrefix)))
+ }
+
+ return nil
+ })
+}
+
+// verifyReferences verifies that the references in the transaction apply on top of the already accepted
+// reference changes. The old tips in the transaction are verified against the current actual tips.
+// It returns the write-ahead log entry for the transaction if it was successfully verified.
+func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction Transaction) (*gitalypb.LogEntry, error) {
+ logEntry := &gitalypb.LogEntry{}
+ for referenceName, tips := range transaction.ReferenceUpdates {
+ // 'git update-ref' doesn't ensure the loose references end up in the
+ // refs directory so we enforce that here.
+ if !strings.HasPrefix(referenceName.String(), "refs/") {
+ return nil, InvalidReferenceFormatError{ReferenceName: referenceName}
+ }
+
+ // We'll later implement reference format verification in Gitaly. update-ref reports errors with these characters
+ // in a difficult to parse manner. For now, let's check these two illegal characters separately so we can return a
+ // proper error.
+ for _, illegalCharacter := range []byte{0, '\n'} {
+ if bytes.Contains([]byte(referenceName), []byte{illegalCharacter}) {
+ return nil, InvalidReferenceFormatError{ReferenceName: referenceName}
+ }
+ }
+
+ actualOldTip, err := mgr.repository.ResolveRevision(ctx, referenceName.Revision())
+ if errors.Is(err, git.ErrReferenceNotFound) {
+ objectHash, err := mgr.repository.ObjectHash(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("object hash: %w", err)
+ }
+
+ actualOldTip = objectHash.ZeroOID
+ } else if err != nil {
+ return nil, fmt.Errorf("resolve revision: %w", err)
+ }
+
+ if tips.OldOID != actualOldTip {
+ if transaction.SkipVerificationFailures {
+ continue
+ }
+
+ return nil, ReferenceVerificationError{
+ ReferenceName: referenceName,
+ ExpectedOID: tips.OldOID,
+ ActualOID: actualOldTip,
+ }
+ }
+
+ logEntry.ReferenceUpdates = append(logEntry.ReferenceUpdates, &gitalypb.LogEntry_ReferenceUpdate{
+ ReferenceName: []byte(referenceName),
+ NewOid: []byte(tips.NewOID),
+ })
+ }
+
+ // Sort the reference updates so the reference changes are always logged in a deterministic order.
+ sort.Slice(logEntry.ReferenceUpdates, func(i, j int) bool {
+ return bytes.Compare(
+ logEntry.ReferenceUpdates[i].ReferenceName,
+ logEntry.ReferenceUpdates[j].ReferenceName,
+ ) == -1
+ })
+
+ if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil {
+ return nil, fmt.Errorf("verify references with git: %w", err)
+ }
+
+ return logEntry, nil
+}
+
+// vefifyReferencesWithGit verifies the reference updates with git by preparing reference transaction. This ensures
+// the updates will go through when they are being applied in the log. This also catches any invalid reference names
+// and file/directory conflicts with Git's loose reference storage which can occur with references like
+// 'refs/heads/parent' and 'refs/heads/parent/child'.
+func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error {
+ updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates)
+ if err != nil {
+ return fmt.Errorf("prepare reference transaction: %w", err)
+ }
+
+ return updater.Close()
+}
+
+// prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing
+// or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up
+// if an error is returned.
+func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) {
+ updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions())
+ if err != nil {
+ return nil, fmt.Errorf("new: %w", err)
+ }
+
+ if err := updater.Start(); err != nil {
+ return nil, fmt.Errorf("start: %w", err)
+ }
+
+ for _, referenceUpdate := range referenceUpdates {
+ if err := updater.Update(git.ReferenceName(referenceUpdate.ReferenceName), git.ObjectID(referenceUpdate.NewOid), ""); err != nil {
+ return nil, fmt.Errorf("update %q: %w", referenceUpdate.ReferenceName, err)
+ }
+ }
+
+ if err := updater.Prepare(); err != nil {
+ var errInvalidReferenceFormat updateref.ErrInvalidReferenceFormat
+ if errors.As(err, &errInvalidReferenceFormat) {
+ return nil, InvalidReferenceFormatError{ReferenceName: git.ReferenceName(errInvalidReferenceFormat.ReferenceName)}
+ }
+
+ return nil, fmt.Errorf("prepare: %w", err)
+ }
+
+ return updater, nil
+}
+
+// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not
+// logged nor applied later.
+func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error {
+ nextLogIndex := mgr.appendedLogIndex + 1
+
+ if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil {
+ return fmt.Errorf("set log entry: %w", err)
+ }
+
+ mgr.appendedLogIndex = nextLogIndex
+
+ return nil
+}
+
+// applyLogEntry reads a log entry at the given index and applies it to the repository.
+func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error {
+ var logEntry gitalypb.LogEntry
+ if err := mgr.readKey(keyLogEntry(getRepositoryID(mgr.repository), logIndex), &logEntry); err != nil {
+ return fmt.Errorf("read log entry: %w", err)
+ }
+
+ updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates)
+ if err != nil {
+ return fmt.Errorf("perpare reference transaction: %w", err)
+ }
+
+ if err := updater.Commit(); err != nil {
+ return fmt.Errorf("commit transaction: %w", err)
+ }
+
+ if err := mgr.storeAppliedLogIndex(logIndex); err != nil {
+ return fmt.Errorf("set applied log index: %w", err)
+ }
+
+ mgr.appliedLogIndex = logIndex
+
+ return nil
+}
+
+// storeLogEntry stores the log entry in the repository's write-ahead log at the given index.
+func (mgr *TransactionManager) storeLogEntry(index LogIndex, entry *gitalypb.LogEntry) error {
+ return mgr.setKey(keyLogEntry(getRepositoryID(mgr.repository), index), entry)
+}
+
+// storeAppliedLogIndex stores the repository's applied log index in the database.
+func (mgr *TransactionManager) storeAppliedLogIndex(index LogIndex) error {
+ return mgr.setKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), index.toProto())
+}
+
+// setKey marshals and stores a given protocol buffer message into the database under the given key.
+func (mgr *TransactionManager) setKey(key []byte, value proto.Message) error {
+ marshaledValue, err := proto.Marshal(value)
+ if err != nil {
+ return fmt.Errorf("marshal value: %w", err)
+ }
+
+ writeBatch := mgr.db.NewWriteBatch()
+ defer writeBatch.Cancel()
+
+ if err := writeBatch.Set(key, marshaledValue); err != nil {
+ return fmt.Errorf("set: %w", err)
+ }
+
+ return writeBatch.Flush()
+}
+
+// readKey reads a key from the database and unmarshals its value in to the destination protocol
+// buffer message.
+func (mgr *TransactionManager) readKey(key []byte, destination proto.Message) error {
+ return mgr.db.View(func(txn databaseTransaction) error {
+ item, err := txn.Get(key)
+ if err != nil {
+ return fmt.Errorf("get: %w", err)
+ }
+
+ return item.Value(func(value []byte) error { return proto.Unmarshal(value, destination) })
+ })
+}
+
+// getRepositoryID returns a repository's ID. The ID should never change as it is used in the database
+// keys. Gitaly does not have a permanent ID to use yet so the repository's storage name and relative
+// path are used as a composite key.
+func getRepositoryID(repository repository) string {
+ return repository.GetStorageName() + ":" + repository.GetRelativePath()
+}
+
+// keyAppliedLogIndex returns the database key storing a repository's last applied log entry's index.
+func keyAppliedLogIndex(repositoryID string) []byte {
+ return []byte(fmt.Sprintf("repository/%s/log/index/applied", repositoryID))
+}
+
+// keyLogEntry returns the database key storing a repository's log entry at a given index.
+func keyLogEntry(repositoryID string, index LogIndex) []byte {
+ marshaledIndex := make([]byte, binary.Size(index))
+ binary.BigEndian.PutUint64(marshaledIndex, uint64(index))
+ return []byte(fmt.Sprintf("%s%s", keyPrefixLogEntries(repositoryID), marshaledIndex))
+}
+
+// keyPrefixLogEntries returns the key prefix holding repository's write-ahead log entries.
+func keyPrefixLogEntries(repositoryID string) []byte {
+ return []byte(fmt.Sprintf("repository/%s/log/entry/", repositoryID))
+}
diff --git a/internal/gitaly/transaction_manager_hook_test.go b/internal/gitaly/transaction_manager_hook_test.go
new file mode 100644
index 000000000..618dbffb9
--- /dev/null
+++ b/internal/gitaly/transaction_manager_hook_test.go
@@ -0,0 +1,137 @@
+package gitaly
+
+import (
+ "context"
+ "regexp"
+ "runtime"
+ "strings"
+ "testing"
+
+ "github.com/dgraph-io/badger/v3"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
+)
+
+// hookFunc is a function that is executed at a specific point. It gets a hookContext that allows it to
+// influence the execution of the test.
+type hookFunc func(hookContext)
+
+// hookContext are the control toggels available in a hook.
+type hookContext struct {
+ // stopManager calls the calls stops the TransactionManager.
+ stopManager func()
+}
+
+// hooks are functions that get invoked at specific points of the TransactionManager Run method. They allow
+// for hooking into the Run method at specific poins which would otherwise to do assertions that would otherwise
+// not be possible.
+type hooks struct {
+ // beforeReadLogEntry is invoked before a log entry is read from the database.
+ beforeReadLogEntry hookFunc
+ // beforeResolveRevision is invoked before ResolveRevision is invoked.
+ beforeResolveRevision hookFunc
+ // beforeDeferredStop is invoked before the deferred Stop is invoked in Run.
+ beforeDeferredStop hookFunc
+}
+
+// installHooks installs the configured hooks into the transactionManager.
+func installHooks(tb testing.TB, transactionManager *TransactionManager, database *badger.DB, repository *localrepo.Repo, hooks hooks) {
+ hookContext := hookContext{stopManager: transactionManager.stop}
+
+ transactionManager.stop = func() {
+ programCounter, _, _, ok := runtime.Caller(2)
+ require.True(tb, ok)
+
+ isDeferredStopInRun := strings.HasSuffix(
+ runtime.FuncForPC(programCounter).Name(),
+ "gitaly.(*TransactionManager).Run",
+ )
+
+ if isDeferredStopInRun && hooks.beforeDeferredStop != nil {
+ hooks.beforeDeferredStop(hookContext)
+ }
+
+ hookContext.stopManager()
+ }
+
+ transactionManager.db = databaseHook{
+ database: newDatabaseAdapter(database),
+ hooks: hooks,
+ hookContext: hookContext,
+ }
+
+ transactionManager.repository = repositoryHook{
+ repository: repository,
+ hookContext: hookContext,
+ hooks: hooks,
+ }
+}
+
+type repositoryHook struct {
+ repository
+ hookContext
+ hooks
+}
+
+func (hook repositoryHook) ResolveRevision(ctx context.Context, revision git.Revision) (git.ObjectID, error) {
+ if hook.beforeResolveRevision != nil {
+ hook.hooks.beforeResolveRevision(hook.hookContext)
+ }
+
+ return hook.repository.ResolveRevision(ctx, revision)
+}
+
+type databaseHook struct {
+ database
+ hookContext
+ hooks
+}
+
+func (hook databaseHook) View(handler func(databaseTransaction) error) error {
+ return hook.database.View(func(transaction databaseTransaction) error {
+ return handler(databaseTransactionHook{
+ databaseTransaction: transaction,
+ hookContext: hook.hookContext,
+ hooks: hook.hooks,
+ })
+ })
+}
+
+func (hook databaseHook) NewWriteBatch() writeBatch {
+ return writeBatchHook{writeBatch: hook.database.NewWriteBatch()}
+}
+
+type databaseTransactionHook struct {
+ databaseTransaction
+ hookContext
+ hooks
+}
+
+var regexLogEntry = regexp.MustCompile("repository/.+/log/entry/")
+
+func (hook databaseTransactionHook) Get(key []byte) (*badger.Item, error) {
+ if regexLogEntry.Match(key) {
+ if hook.hooks.beforeReadLogEntry != nil {
+ hook.hooks.beforeReadLogEntry(hook.hookContext)
+ }
+ }
+
+ return hook.databaseTransaction.Get(key)
+}
+
+func (hook databaseTransactionHook) NewIterator(options badger.IteratorOptions) *badger.Iterator {
+ return hook.databaseTransaction.NewIterator(options)
+}
+
+type writeBatchHook struct {
+ writeBatch
+}
+
+func (hook writeBatchHook) Set(key []byte, value []byte) error {
+ return hook.writeBatch.Set(key, value)
+}
+
+func (hook writeBatchHook) Flush() error { return hook.writeBatch.Flush() }
+
+func (hook writeBatchHook) Cancel() { hook.writeBatch.Cancel() }
diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go
new file mode 100644
index 000000000..808fa172d
--- /dev/null
+++ b/internal/gitaly/transaction_manager_test.go
@@ -0,0 +1,1586 @@
+package gitaly
+
+import (
+ "context"
+ "encoding/hex"
+ "fmt"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+)
+
+func TestTransactionManager(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ // A clean repository is setup for each test. We build a repository ahead of the tests here once to
+ // get deterministic commit IDs, relative path and object hash we can use to build the declarative
+ // test cases.
+ relativePath := gittest.NewRepositoryName(t)
+ setupRepository := func(t *testing.T) (*localrepo.Repo, git.ObjectID, git.ObjectID, git.ObjectID) {
+ t.Helper()
+
+ cfg := testcfg.Build(t)
+
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ RelativePath: relativePath,
+ })
+
+ rootCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents())
+ secondCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(rootCommitOID))
+ thirdCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(secondCommitOID))
+
+ cmdFactory, clean, err := git.NewExecCommandFactory(cfg)
+ require.NoError(t, err)
+ t.Cleanup(clean)
+
+ catfileCache := catfile.NewCache(cfg)
+ t.Cleanup(catfileCache.Stop)
+
+ localRepo := localrepo.New(
+ config.NewLocator(cfg),
+ cmdFactory,
+ catfileCache,
+ repo,
+ )
+
+ return localRepo, rootCommitOID, secondCommitOID, thirdCommitOID
+ }
+
+ // Collect commit OIDs and the object has so we can define the test cases with them.
+ repo, rootCommitOID, secondCommitOID, thirdCommitOID := setupRepository(t)
+ objectHash, err := repo.ObjectHash(ctx)
+ require.NoError(t, err)
+
+ hasher := objectHash.Hash()
+ _, err = hasher.Write([]byte("content does not matter"))
+ require.NoError(t, err)
+ nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil)))
+ require.NoError(t, err)
+
+ type testHooks struct {
+ // BeforeApplyLogEntry is called before a log entry is applied to the repository.
+ BeforeApplyLogEntry hookFunc
+ // BeforeAppendLogEntry is called before a log entry is appended to the log.
+ BeforeAppendLogEntry hookFunc
+ // WaitForTransactionsWhenStopping waits for a in-flight to finish before returning
+ // from Run.
+ WaitForTransactionsWhenStopping bool
+ }
+
+ // Step defines a single execution step in a test. Each test case can define multiple steps to setup exercise
+ // more complex behavior and to assert the state after each step.
+ type steps []struct {
+ // StopManager stops the manager in the beginning of the step.
+ StopManager bool
+ // StartManager can be used to start the manager again after stopping it.
+ StartManager bool
+ // Context is the context to use for the Propose call of the step.
+ Context context.Context
+ // Transaction is the transaction that is proposed in this step.
+ Transaction Transaction
+ // Hooks contains the hook functions that are configured on the TransactionManager. These allow
+ // for better synchronization.
+ Hooks testHooks
+ // ExpectedRunError is the expected error to be returned from Run from this step.
+ ExpectedRunError bool
+ // ExpectedProposeError is the error that is expected to be returned when proposing the transaction in this step.
+ ExpectedProposeError error
+ // ExpectedReferences is the expected state of references at the end of this step.
+ ExpectedReferences []git.Reference
+ // ExpectedDatabase is the expected state of the database at the end of this step.
+ ExpectedDatabase DatabaseState
+ }
+
+ type testCase struct {
+ desc string
+ steps steps
+ }
+
+ testCases := []testCase{
+ {
+ desc: "invalid reference aborts the entire transaction",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ SkipVerificationFailures: true,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: "refs/heads/../main"},
+ },
+ },
+ },
+ {
+ desc: "continues processing after aborting due to an invalid reference",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: "refs/heads/../main"},
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "create reference",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "create a file-directory reference conflict different transaction",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/parent", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/parent"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: updateref.ErrFileDirectoryConflict{
+ ExistingReferenceName: "refs/heads/parent",
+ ConflictingReferenceName: "refs/heads/parent/child",
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/parent", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/parent"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "create a file-directory reference conflict in same transaction",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: updateref.ErrInTransactionConflict{
+ FirstReferenceName: "refs/heads/parent",
+ SecondReferenceName: "refs/heads/parent/child",
+ },
+ },
+ },
+ },
+ {
+ desc: "file-directory conflict aborts the transaction with verification failures skipped",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ SkipVerificationFailures: true,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: updateref.ErrInTransactionConflict{
+ FirstReferenceName: "refs/heads/parent",
+ SecondReferenceName: "refs/heads/parent/child",
+ },
+ },
+ },
+ },
+ {
+ desc: "delete file-directory conflict in different transaction",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/parent/child", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/parent/child"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID},
+ },
+ },
+ ExpectedProposeError: updateref.ErrFileDirectoryConflict{
+ ExistingReferenceName: "refs/heads/parent/child",
+ ConflictingReferenceName: "refs/heads/parent",
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/parent/child", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/parent/child"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "delete file-directory conflict in same transaction",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID},
+ },
+ },
+ ExpectedProposeError: updateref.ErrInTransactionConflict{
+ FirstReferenceName: "refs/heads/parent",
+ SecondReferenceName: "refs/heads/parent/child",
+ },
+ },
+ },
+ },
+ {
+ desc: "create a branch to a non-commit object",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ SkipVerificationFailures: true,
+ ReferenceUpdates: ReferenceUpdates{
+ // The error should abort the entire transaction.
+ "refs/heads/branch-1": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/branch-2": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID},
+ },
+ },
+ ExpectedProposeError: updateref.NonCommitObjectError{
+ ReferenceName: "refs/heads/branch-2",
+ ObjectID: objectHash.EmptyTreeOID.String(),
+ },
+ },
+ },
+ },
+ {
+ desc: "create a tag to a non-commit object",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/tags/v1.0.0": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/tags/v1.0.0", Target: objectHash.EmptyTreeOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/tags/v1.0.0"),
+ NewOid: []byte(objectHash.EmptyTreeOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "create a reference to non-existent object",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: nonExistentOID},
+ },
+ },
+ ExpectedProposeError: updateref.NonExistentObjectError{
+ ReferenceName: "refs/heads/main",
+ ObjectID: nonExistentOID.String(),
+ },
+ },
+ },
+ },
+ {
+ desc: "create reference ignoring verification failure",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ SkipVerificationFailures: true,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID},
+ "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: secondCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(secondCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "create reference that already exists",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID},
+ "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: objectHash.ZeroOID,
+ ActualOID: rootCommitOID,
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "create reference no-op",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: objectHash.ZeroOID,
+ ActualOID: rootCommitOID,
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "update reference",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(secondCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "update reference ignoring verification failures",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ SkipVerificationFailures: true,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID},
+ "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: thirdCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(thirdCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "update reference with incorrect old tip",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID},
+ "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: secondCommitOID,
+ ActualOID: rootCommitOID,
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "update non-existent reference",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: secondCommitOID,
+ ActualOID: objectHash.ZeroOID,
+ },
+ },
+ },
+ },
+ {
+ desc: "update reference no-op",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "delete reference",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID},
+ },
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(objectHash.ZeroOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "delete reference ignoring verification failures",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ SkipVerificationFailures: true,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID},
+ "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(objectHash.ZeroOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "delete reference with incorrect old tip",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID},
+ "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: secondCommitOID,
+ ActualOID: rootCommitOID,
+ },
+ ExpectedReferences: []git.Reference{
+ {Name: "refs/heads/main", Target: rootCommitOID.String()},
+ {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()},
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ {
+ ReferenceName: []byte("refs/heads/non-conflicting"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "delete non-existent reference",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: rootCommitOID,
+ ActualOID: objectHash.ZeroOID,
+ },
+ },
+ },
+ },
+ {
+ desc: "delete reference no-op",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID},
+ },
+ },
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(objectHash.ZeroOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "continues processing after reference verification failure",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: rootCommitOID,
+ ActualOID: objectHash.ZeroOID,
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(secondCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "continues processing after a restart",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ StopManager: true,
+ StartManager: true,
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(secondCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "continues processing after restarting after a reference verification failure",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: rootCommitOID,
+ ActualOID: objectHash.ZeroOID,
+ },
+ },
+ {
+ StopManager: true,
+ StartManager: true,
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(secondCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "recovers from the write-ahead log on start up",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookCtx hookContext) {
+ hookCtx.stopManager()
+ },
+ },
+ ExpectedProposeError: ErrTransactionProcessingStopped,
+ ExpectedRunError: true,
+ ExpectedDatabase: DatabaseState{
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID},
+ },
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(secondCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "reference verification fails after recovering logged writes",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookCtx hookContext) {
+ hookCtx.stopManager()
+ },
+ },
+ ExpectedProposeError: ErrTransactionProcessingStopped,
+ ExpectedRunError: true,
+ ExpectedDatabase: DatabaseState{
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: secondCommitOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: ReferenceVerificationError{
+ ReferenceName: "refs/heads/main",
+ ExpectedOID: secondCommitOID,
+ ActualOID: rootCommitOID,
+ },
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "propose returns if context is canceled before admission",
+ steps: steps{
+ {
+ Context: func() context.Context {
+ ctx, cancel := context.WithCancel(ctx)
+ cancel()
+ return ctx
+ }(),
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: context.Canceled,
+ },
+ },
+ },
+ {
+ desc: "propose returns if transaction processing stops before admission",
+ steps: steps{
+ {
+ StopManager: true,
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: ErrTransactionProcessingStopped,
+ },
+ },
+ },
+ func() testCase {
+ ctx, cancel := context.WithCancel(ctx)
+ return testCase{
+ desc: "propose returns if context is canceled after admission",
+ steps: steps{
+ {
+ Context: ctx,
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookCtx hookContext) {
+ // Cancel the context used in Propose
+ cancel()
+ },
+ },
+ ExpectedProposeError: context.Canceled,
+ ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}},
+ ExpectedDatabase: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(),
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ }(),
+ {
+ desc: "propose returns if transaction processing stops before transaction acceptance",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ Hooks: testHooks{
+ BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.stopManager() },
+ // This ensures we are testing the context cancellation errors being unwrapped properly
+ // to an ErrTransactionProcessingStopped instead of hitting the general case when
+ // runDone is closed.
+ WaitForTransactionsWhenStopping: true,
+ },
+ ExpectedProposeError: ErrTransactionProcessingStopped,
+ },
+ },
+ },
+ {
+ desc: "propose returns if transaction processing stops after transaction acceptance",
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookCtx hookContext) {
+ hookCtx.stopManager()
+ },
+ },
+ ExpectedProposeError: ErrTransactionProcessingStopped,
+ ExpectedRunError: true,
+ ExpectedDatabase: DatabaseState{
+ string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{
+ ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{
+ {
+ ReferenceName: []byte("refs/heads/main"),
+ NewOid: []byte(rootCommitOID),
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+
+ type invalidReferenceTestCase struct {
+ desc string
+ referenceName git.ReferenceName
+ invalidReference git.ReferenceName
+ }
+
+ appendInvalidReferenceTestCase := func(tc invalidReferenceTestCase) {
+ invalidReference := tc.invalidReference
+ if invalidReference == "" {
+ invalidReference = tc.referenceName
+ }
+
+ testCases = append(testCases, testCase{
+ desc: fmt.Sprintf("invalid reference %s", tc.desc),
+ steps: steps{
+ {
+ Transaction: Transaction{
+ ReferenceUpdates: ReferenceUpdates{
+ tc.referenceName: {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
+ },
+ },
+ ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: invalidReference},
+ },
+ },
+ })
+ }
+
+ // Generate test cases for the reference format rules according to https://git-scm.com/docs/git-check-ref-format.
+ // This is to ensure the references are correctly validated prior to logging so they are guaranteed to apply later.
+ for _, tc := range []invalidReferenceTestCase{
+ // 1. They can include slash / for hierarchical (directory) grouping, but no slash-separated
+ // component can begin with a dot . or end with the sequence .lock.
+ {"starting with a period", ".refs/heads/main", ""},
+ {"subcomponent starting with a period", "refs/heads/.main", ""},
+ {"ending in .lock", "refs/heads/main.lock", ""},
+ {"subcomponent ending in .lock", "refs/heads/main.lock/main", ""},
+ // 2. They must contain at least one /. This enforces the presence of a category like heads/,
+ // tags/ etc. but the actual names are not restricted.
+ {"without a /", "one-level", ""},
+ {"with refs without a /", "refs", ""},
+ // We restrict this further by requiring a 'refs/' prefix to ensure loose references only end up
+ // in the 'refs/' folder.
+ {"without refs/ prefix ", "nonrefs/main", ""},
+ // 3. They cannot have two consecutive dots .. anywhere.
+ {"containing two consecutive dots", "refs/heads/../main", ""},
+ // 4. They cannot have ASCII control characters ... (\177 DEL), space, tilde ~, caret ^, or colon : anywhere.
+ //
+ // Tests for control characters < \040 generated further down.
+ {"containing DEL", "refs/heads/ma\177in", "refs/heads/ma?in"},
+ {"containing space", "refs/heads/ma in", ""},
+ {"containing ~", "refs/heads/ma~in", ""},
+ {"containing ^", "refs/heads/ma^in", ""},
+ {"containing :", "refs/heads/ma:in", ""},
+ // 5. They cannot have question-mark ?, asterisk *, or open bracket [ anywhere.
+ {"containing ?", "refs/heads/ma?in", ""},
+ {"containing *", "refs/heads/ma*in", ""},
+ {"containing [", "refs/heads/ma[in", ""},
+ // 6. They cannot begin or end with a slash / or contain multiple consecutive slashes
+ {"begins with /", "/refs/heads/main", ""},
+ {"ends with /", "refs/heads/main/", ""},
+ {"contains consecutive /", "refs/heads//main", ""},
+ // 7. They cannot end with a dot.
+ {"ending in a dot", "refs/heads/main.", ""},
+ // 8. They cannot contain a sequence @{.
+ {"invalid reference contains @{", "refs/heads/m@{n", ""},
+ // 9. They cannot be the single character @.
+ {"is a single character @", "@", ""},
+ // 10. They cannot contain a \.
+ {`containing \`, `refs/heads\main`, `refs/heads\main`},
+ } {
+ appendInvalidReferenceTestCase(tc)
+ }
+
+ // Rule 4. They cannot have ASCII control characters i.e. bytes whose values are lower than \040,
+ for i := byte(0); i < '\040'; i++ {
+ invalidReference := fmt.Sprintf("refs/heads/ma%sin", []byte{i})
+
+ // For now, the reference format is checked by 'git update-ref'. Most of the control characters
+ // are substituted by a '?' by Git when reporting the error.
+ //
+ // '\t' is reported without substitution. '\n' and '\000' are checked by the TransactionManager
+ // separately so it can report a better error than what Git produces.
+ //
+ // This is temporarily more complicated than it needs to be. Later iteration will move reference
+ // format checking into Gitaly at which point we can return proper errors without substituting
+ // the problematic characters.
+ var expectedSubstitute string
+ switch i {
+ case '\000', '\n', '\t':
+ expectedSubstitute = string(i)
+ default:
+ expectedSubstitute = "?"
+ }
+
+ appendInvalidReferenceTestCase(invalidReferenceTestCase{
+ desc: fmt.Sprintf(`containing ASCII control character %d`, i),
+ referenceName: git.ReferenceName(invalidReference),
+ invalidReference: git.ReferenceName(fmt.Sprintf("refs/heads/ma%sin", expectedSubstitute)),
+ })
+ }
+
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ // Setup the repository with the exact same state as what was used to build the test cases.
+ repository, _, _, _ := setupRepository(t)
+
+ database, err := OpenDatabase(t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, database)
+
+ var (
+ // managerRunning tracks whether the manager is running or stopped.
+ managerRunning bool
+ // transactionManager is the current TransactionManager instance.
+ transactionManager *TransactionManager
+ // managerErr is used for synchronizing manager stopping and returning
+ // the error from Run.
+ managerErr chan error
+ // inflightTransactions tracks the number of on going propose calls. It is used to synchronize
+ // the database hooks with propose calls.
+ inflightTransactions sync.WaitGroup
+ )
+
+ // stopManager stops the manager. It waits until the manager's Run method has exited.
+ stopManager := func() {
+ t.Helper()
+
+ transactionManager.Stop()
+ managerRunning, err = checkManagerError(t, managerErr, transactionManager)
+ require.NoError(t, err)
+ }
+
+ // startManager starts fresh manager and applies hooks into it.
+ startManager := func(testHooks testHooks) {
+ t.Helper()
+
+ require.False(t, managerRunning, "manager started while it was already running")
+ managerRunning = true
+ managerErr = make(chan error)
+
+ transactionManager = NewTransactionManager(database, repository)
+ installHooks(t, transactionManager, database, repository, hooks{
+ beforeResolveRevision: testHooks.BeforeAppendLogEntry,
+ beforeReadLogEntry: testHooks.BeforeApplyLogEntry,
+ beforeDeferredStop: func(hookContext) {
+ if testHooks.WaitForTransactionsWhenStopping {
+ inflightTransactions.Wait()
+ }
+ },
+ })
+
+ go func() { managerErr <- transactionManager.Run() }()
+ }
+
+ // Stop the manager if it is running at the end of the test.
+ defer stopManager()
+ for _, step := range tc.steps {
+ // Ensure every step starts with the manager running.
+ if !managerRunning {
+ startManager(step.Hooks)
+ }
+
+ if step.StopManager {
+ require.True(t, managerRunning, "manager stopped while it was already stopped")
+ stopManager()
+ }
+
+ if step.StartManager {
+ startManager(step.Hooks)
+ }
+
+ func() {
+ inflightTransactions.Add(1)
+ defer inflightTransactions.Done()
+
+ proposeCtx := ctx
+ if step.Context != nil {
+ proposeCtx = step.Context
+ }
+
+ require.ErrorIs(t, transactionManager.Propose(proposeCtx, step.Transaction), step.ExpectedProposeError)
+ }()
+
+ if managerRunning, err = checkManagerError(t, managerErr, transactionManager); step.ExpectedRunError {
+ require.Error(t, err)
+ } else {
+ require.NoError(t, err)
+ }
+
+ RequireReferences(t, ctx, repository, step.ExpectedReferences)
+ RequireDatabase(t, ctx, database, step.ExpectedDatabase)
+ }
+ })
+ }
+}
+
+func checkManagerError(t *testing.T, managerErr chan error, mgr *TransactionManager) (bool, error) {
+ t.Helper()
+
+ select {
+ case err, ok := <-managerErr:
+ if ok {
+ close(managerErr)
+ }
+
+ // managerErr returns the possible error if manager has already stopped.
+ return false, err
+ case mgr.admissionQueue <- transactionFuture{
+ transaction: Transaction{ReferenceUpdates: ReferenceUpdates{"sentinel": {}}},
+ result: make(resultChannel, 1),
+ }:
+ // If the error channel doesn't receive, we don't know whether it is because the manager is still running
+ // or we are still waiting for it to return. We test whether the manager is running or not here by queueing a
+ // a proposal that will error. If the manager processes it, we know it is still running.
+ return true, nil
+ }
+}
diff --git a/internal/testhelper/leakage.go b/internal/testhelper/leakage.go
index ffec6c7ae..5dc9cae20 100644
--- a/internal/testhelper/leakage.go
+++ b/internal/testhelper/leakage.go
@@ -16,6 +16,10 @@ import (
// mustHaveNoGoroutines panics if it finds any Goroutines running.
func mustHaveNoGoroutines() {
if err := goleak.Find(
+ // BadgerDB depends on Ristretto which uses glog for logging. glog initializes
+ // on import a log flushing goroutine that keeps running in the background.
+ // Ignore this goroutine as there is no way to stop it.
+ goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"),
// opencensus has a "defaultWorker" which is started by the package's
// `init()` function. There is no way to stop this worker, so it will leak
// whenever we import the package.