From 8a29ef51192cc57770dcd7966d9af8173eb7ff4d Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Fri, 14 Oct 2022 15:29:16 +0300 Subject: Implement basic transaction processing with write-ahead logging Gitaly is currently lacking in transaction control. Each write coming in launches their own Git commands which operate on the repository concurrently. This makes transaction management difficult. It's difficult to optimize the writes as they are being done from multiple locations without synchronization. The concurrent writers may step on each others toes and surface lock conflicts to the users. Recovering from crashes is also difficult as Gitaly is not logging the modifications it is about to perform and thus loses the transaction state on crashes. There's also no clear notion of ordering which further complicates replication related matters. It's not easy to say which writes a repository is missing and which not. We've recently designed a new replication architecture for Gitaly. The new architecture relies on a replicated write-ahead log. The write-ahead log defines a clear order of writes and aids in crash recovery. A single writer will be operating on a repository which makes further optimizations such as write batching easier. This commit implements the first steps towards the new architecture by implementing the TransactionManager. The TransactionManager will be responsible for transaction management of a single repository. It will be the single goroutine that writes into a repository and is invoked by all other locations in the code that wish to write. It will also be responsible for synchronizing reads by ensuring they see the changes they are supposed to see. TransactionManager implementation introduced here does not contain the full implementation but aims to provide a basis for future iteration. For now, it implements basic write processing with a write-ahead log. It processes writes one-by-one by verifying references, logging the changes and finally applying the changes to the repository. It also supports recovering from the write-ahead log should the log processing be interrupted. The reference verification behavior can be tuned on a per transaction level to match behavior Git's `--atomic` or `--force` push flags. The TransactionManager stores the state related to the write-ahead log in BadgerDB, which is a key-value store that will be local to each Gitaly storage. The values are marshaled protocol buffer messages. This iteration is mostly concerned with the reference updating logic. Pack files are not handled yet as aren't the internal references they need. Symbolic references, namely for updating the default branch, are not handled yet either. The writes are processed one by one and are acknowledged after applying them to the repository. Given that, there's not separate logic needed for read synchronization yet either. The goal here is to set the initial interface and log processing, and to lock down the reference updating logic with tests so we can later on safely start iterating on the internals of the TransactionManager and start adding support for the missing functionality. --- internal/gitaly/database.go | 59 + internal/gitaly/testhelper_test.go | 72 + internal/gitaly/transaction_manager.go | 539 ++++++++ internal/gitaly/transaction_manager_hook_test.go | 137 ++ internal/gitaly/transaction_manager_test.go | 1586 ++++++++++++++++++++++ internal/testhelper/leakage.go | 4 + 6 files changed, 2397 insertions(+) create mode 100644 internal/gitaly/database.go create mode 100644 internal/gitaly/testhelper_test.go create mode 100644 internal/gitaly/transaction_manager.go create mode 100644 internal/gitaly/transaction_manager_hook_test.go create mode 100644 internal/gitaly/transaction_manager_test.go (limited to 'internal') diff --git a/internal/gitaly/database.go b/internal/gitaly/database.go new file mode 100644 index 000000000..2015de710 --- /dev/null +++ b/internal/gitaly/database.go @@ -0,0 +1,59 @@ +package gitaly + +import ( + "github.com/dgraph-io/badger/v3" +) + +// OpenDatabase opens a new database handle to a database at the given path. +func OpenDatabase(databasePath string) (*badger.DB, error) { + dbOptions := badger.DefaultOptions(databasePath) + // Enable SyncWrites to ensure all writes are persisted to disk before considering + // them committed. + dbOptions.SyncWrites = true + // Badger by default logs fairly verbose statistics when opening a database. Disable the + // logging for now. + dbOptions.Logger = nil + + return badger.Open(dbOptions) +} + +// databaseAdapter adapts a *badger.DB to the internal database interface used by the hooks in tests. +type databaseAdapter struct{ *badger.DB } + +// newDatabaseAdapter adapts a *badger.DB to conform to the internal database interface used for +// hooking into during testing. +func newDatabaseAdapter(db *badger.DB) database { + return databaseAdapter{DB: db} +} + +// NewWriteBatch calls badger.*DB.NewWriteBatch. Refer to Badger's documentation for details. +func (db databaseAdapter) NewWriteBatch() writeBatch { + return db.DB.NewWriteBatch() +} + +// NewWriteBatch calls badger.*DB.View. Refer to Badger's documentation for details. +func (db databaseAdapter) View(handler func(databaseTransaction) error) error { + return db.DB.View(func(txn *badger.Txn) error { return handler(txn) }) +} + +// database is the Badger.DB interface used by TransactionManager. Refer to Badger's documentation +// for details. +type database interface { + NewWriteBatch() writeBatch + View(func(databaseTransaction) error) error +} + +// writeBatch is the interface of Badger.WriteBatch used by TransactionManager. Refer to Badger's +// documentation for details. +type writeBatch interface { + Set([]byte, []byte) error + Flush() error + Cancel() +} + +// databaseTransaction is the interface of *Badger.Txn used by TransactionManager. Refer to Badger's +// documentation for details +type databaseTransaction interface { + Get([]byte) (*badger.Item, error) + NewIterator(badger.IteratorOptions) *badger.Iterator +} diff --git a/internal/gitaly/testhelper_test.go b/internal/gitaly/testhelper_test.go new file mode 100644 index 000000000..9f194318d --- /dev/null +++ b/internal/gitaly/testhelper_test.go @@ -0,0 +1,72 @@ +package gitaly + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "google.golang.org/protobuf/proto" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} + +// DatabaseState describes the expected state of the key-value store. The keys in the map are the expected keys +// in the database and the values are the expected unmarshaled values. +type DatabaseState map[string]proto.Message + +// RequireDatabase asserts the actual database state matches the expected database state. The actual values in the +// database are unmarshaled to the same type the values have in the expected database state. +func RequireDatabase(tb testing.TB, ctx context.Context, database *badger.DB, expectedState DatabaseState) { + tb.Helper() + + if expectedState == nil { + expectedState = DatabaseState{} + } + + actualState := DatabaseState{} + unexpectedKeys := []string{} + require.NoError(tb, database.View(func(txn *badger.Txn) error { + iterator := txn.NewIterator(badger.DefaultIteratorOptions) + defer iterator.Close() + + for iterator.Rewind(); iterator.Valid(); iterator.Next() { + key := iterator.Item().Key() + expectedValue, ok := expectedState[string(key)] + if !ok { + // Print the keys out escaped as otherwise the non-printing characters are not visible in the assertion failure. + unexpectedKeys = append(unexpectedKeys, fmt.Sprintf("%q", key)) + continue + } + + require.NoError(tb, iterator.Item().Value(func(value []byte) error { + // Unmarshal the actual value to the same type as the expected value. + actualValue := reflect.New(reflect.TypeOf(expectedValue).Elem()).Interface().(proto.Message) + require.NoError(tb, proto.Unmarshal(value, actualValue)) + actualState[string(key)] = actualValue + return nil + })) + } + + return nil + })) + + require.Empty(tb, unexpectedKeys, "database contains unexpected keys") + testhelper.ProtoEqual(tb, expectedState, actualState) +} + +// RequireReferences asserts that the actual state of the references in the repository match the expected. +func RequireReferences(tb testing.TB, ctx context.Context, repo *localrepo.Repo, expectedReferences []git.Reference) { + tb.Helper() + + actualReferences, err := repo.GetReferences(ctx) + require.NoError(tb, err) + require.ElementsMatch(tb, expectedReferences, actualReferences) +} diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go new file mode 100644 index 000000000..c9c89bf81 --- /dev/null +++ b/internal/gitaly/transaction_manager.go @@ -0,0 +1,539 @@ +package gitaly + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "sort" + "strings" + + "github.com/dgraph-io/badger/v3" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/protobuf/proto" +) + +// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. +var ErrTransactionProcessingStopped = errors.New("transaction processing stopped") + +// InvalidReferenceFormatError is returned when a reference name was invalid. +type InvalidReferenceFormatError struct { + // ReferenceName is the reference with invalid format. + ReferenceName git.ReferenceName +} + +// Error returns the formatted error string. +func (err InvalidReferenceFormatError) Error() string { + return fmt.Sprintf("invalid reference format: %q", err.ReferenceName) +} + +// ReferenceVerificationError is returned when a reference's old OID did not match the expected. +type ReferenceVerificationError struct { + // ReferenceName is the name of the reference that failed verification. + ReferenceName git.ReferenceName + // ExpectedOID is the OID the reference was expected to point to. + ExpectedOID git.ObjectID + // ActualOID is the OID the reference actually pointed to. + ActualOID git.ObjectID +} + +// Error returns the formatted error string. +func (err ReferenceVerificationError) Error() string { + return fmt.Sprintf("expected %q to point to %q but it pointed to %q", err.ReferenceName, err.ExpectedOID, err.ActualOID) +} + +// LogIndex points to a specific position in a repository's write-ahead log. +type LogIndex uint64 + +// toProto returns the protobuf representation of LogIndex for serialization purposes. +func (index LogIndex) toProto() *gitalypb.LogIndex { + return &gitalypb.LogIndex{LogIndex: uint64(index)} +} + +// ReferenceUpdate describes the state of a reference's old and new tip in an update. +type ReferenceUpdate struct { + // OldOID is the old OID the reference is expected to point to prior to updating it. + // If the reference does not point to the old value, the reference verification fails. + OldOID git.ObjectID + // NewOID is the new desired OID to point the reference to. + NewOID git.ObjectID +} + +// ReferenceUpdates contains references to update. Reference name is used as the key and the value +// is the expected old tip and the desired new tip. +type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate + +// Transaction is a unit-of-work that contains reference changes to perform on the repository. +type Transaction struct { + // SkipVerificationFailures causes references updates that failed the verification step to be + // dropped from the transaction. By default, any verification failures abort the entire transaction. + // + // The default behavior maps to the `--atomic` flag of `git push`. When this is set, the behavior matches + // what happens git's behavior without the flag. + SkipVerificationFailures bool + // ReferenceUpdates contains the reference updates to be performed. + ReferenceUpdates +} + +// TransactionManager is responsible for transaction management of a single repository. Each repository has +// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from +// the admissionQueue. Each admitted write is processed in three steps: +// +// 1. The references being updated are verified by ensuring the expected old tips match what the references +// actually point to prior to update. The entire transaction is by default aborted if a single reference +// fails the verification step. The reference verification behavior can be controlled on a per-transaction +// level by setting: +// - The reference verification failures can be ignored instead of aborting the entire transaction. +// If done, the references that failed verification are dropped from the transaction but the updates +// that passed verification are still performed. +// - The reference verification may also be skipped if the write is force updating references. If +// done, the current state of the references is ignored and they are directly updated to point +// to the new tips. +// 2. The transaction is appended to the write-ahead log. Once the write has been logged, it is effectively +// committed and will be applied to the repository even after restarting. +// 3. The transaction is applied from the write-ahead log to the repository by actually performing the reference +// changes. +// +// The goroutine that issued the transaction is waiting for the result while these steps are being performed. As +// there is no transaction control for readers yet, the issuer is only notified of a successful write after the +// write has been applied to the repository. +// +// TransactionManager recovers transactions after interruptions by applying the write-ahead logged transactions to +// the repository on start up. +// +// TransactionManager maintains the write-ahead log in a key-value store. It maintains the following key spaces: +// - `repository//log/index/applied` +// - This key stores the index of the log entry that has been applied to the repository. This allows for +// determining how far a repository is in processing the log and which log entries need to be applied +// after starting up. Repository starts from log index 0 if there are no log entries recorded to have +// been applied. +// +// - `repository//log/entry/` +// - These keys hold the actual write-ahead log entries. A repository's first log entry starts at index 1 +// and the log index keeps monotonically increasing from there on without gaps. The write-ahead log +// entries are processed in ascending order. +// +// The values in the database are marshaled protocol buffer messages. Numbers in the keys are encoded as big +// endian to preserve the sort order of the numbers also in lexicographical order. +type TransactionManager struct { + // ctx is the context used for all operations. + ctx context.Context + // stop cancels ctx and stops the transaction processing. + stop context.CancelFunc + + // stopCalled is closed when Stop is called. It unblock transactions that are waiting to be admitted. + stopCalled <-chan struct{} + // runDone is closed when Run returns. It unblocks transactions that are waiting for a result after + // being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly + // releases awaiters when the transactions processing is stopped. + runDone chan struct{} + + // repository is the repository this TransactionManager is acting on. + repository repository + // db is the handle to the key-value store used for storing the write-ahead log related state. + db database + // admissionQueue is where the incoming writes are waiting to be admitted to the transaction + // manager. + admissionQueue chan transactionFuture + + // appendedLogIndex holds the index of the last log entry appended to the log. + appendedLogIndex LogIndex + // appliedLogIndex holds the index of the last log entry applied to the repository + appliedLogIndex LogIndex +} + +// repository is the localrepo interface used by TransactionManager. +type repository interface { + git.RepositoryExecutor + ResolveRevision(context.Context, git.Revision) (git.ObjectID, error) +} + +// NewTransactionManager returns a new TransactionManager for the given repository. +func NewTransactionManager(db *badger.DB, repository *localrepo.Repo) *TransactionManager { + ctx, cancel := context.WithCancel(context.Background()) + return &TransactionManager{ + ctx: ctx, + stopCalled: ctx.Done(), + runDone: make(chan struct{}), + stop: cancel, + repository: repository, + db: newDatabaseAdapter(db), + admissionQueue: make(chan transactionFuture), + } +} + +// resultChannel represents a future that will yield the result of a transaction once its +// outcome has been decided. +type resultChannel chan error + +// transactionFuture holds a transaction and the resultChannel the transaction queuer is waiting +// for the result on. +type transactionFuture struct { + transaction Transaction + result resultChannel +} + +// Propose queues a transaction for the TransactionManager to process. Propose returns once the transaction +// has been successfully applied to the repository. +// +// Transaction is not committed if any of the following errors is returned: +// - InvalidReferenceFormatError is returned when attempting to update a reference with an invalid name. +// - ReferenceVerificationError is returned when the reference does not point to the expected old tip. +// +// Transaction may or may not be committed if any of the following errors is returned: +// - ErrTransactionProcessing stopped is returned when the TransactionManager is closing and stops processing +// transactions. +// - Unexpected error occurs. +func (mgr *TransactionManager) Propose(ctx context.Context, transaction Transaction) error { + queuedTransaction := transactionFuture{transaction: transaction, result: make(resultChannel, 1)} + + select { + case mgr.admissionQueue <- queuedTransaction: + select { + case err := <-queuedTransaction.result: + return unwrapExpectedError(err) + case <-ctx.Done(): + return ctx.Err() + case <-mgr.runDone: + return ErrTransactionProcessingStopped + } + case <-ctx.Done(): + return ctx.Err() + case <-mgr.stopCalled: + return ErrTransactionProcessingStopped + } +} + +// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller. +func unwrapExpectedError(err error) error { + // The manager controls its own execution context and it is canceled only when Stop is called. + // Any context.Canceled errors returned are thus from shutting down so we report that here. + if errors.Is(err, context.Canceled) { + return ErrTransactionProcessingStopped + } + + return err +} + +// Run starts the transaction processing. On start up Run loads the indexes of the last appended and applied +// log entries from the database. It will then apply any transactions that have been logged but not applied +// to the repository. Once the recovery is completed, Run starts processing new transactions by verifying the +// references, logging the transaction and finally applying it to the repository. The transactions are acknowledged +// once they've been applied to the repository. +// +// Run keeps running until Stop is called or it encounters a fatal error. All active Propose calls return +// ErrTransactionProcessingStopped when Run returns. +func (mgr *TransactionManager) Run() error { + // Defer the Stop in order to release all on-going Propose calls in case of error. + defer close(mgr.runDone) + defer mgr.Stop() + + if err := mgr.initialize(); err != nil { + return fmt.Errorf("initialize: %w", err) + } + + // awaitingTransactions contains transactions waiting for their log entry to be applied to + // the repository. It's keyed by the log index the transaction is waiting to be applied and the + // value is the resultChannel that is waiting the result. + awaitingTransactions := make(map[LogIndex]resultChannel) + + for { + if mgr.appliedLogIndex < mgr.appendedLogIndex { + logIndex := mgr.appliedLogIndex + 1 + + if err := mgr.applyLogEntry(mgr.ctx, logIndex); err != nil { + return fmt.Errorf("apply log entry: %w", err) + } + + // There is no awaiter for a transaction if the transaction manager is recovering + // transactions from the log after starting up. + if resultChan, ok := awaitingTransactions[logIndex]; ok { + resultChan <- nil + delete(awaitingTransactions, logIndex) + } + + continue + } + + var transaction transactionFuture + select { + case transaction = <-mgr.admissionQueue: + case <-mgr.ctx.Done(): + } + + // Return if the manager was stopped. The select is indeterministic so this guarantees + // the manager stops the processing even if there are transactions in the queue. + if mgr.ctx.Err() != nil { + return nil + } + + if err := func() error { + logEntry, err := mgr.verifyReferences(mgr.ctx, transaction.transaction) + if err != nil { + return fmt.Errorf("verify references: %w", err) + } + + return mgr.appendLogEntry(logEntry) + }(); err != nil { + transaction.result <- err + continue + } + + awaitingTransactions[mgr.appendedLogIndex] = transaction.result + } +} + +// Stop stops the transaction processing causing Run to return. +func (mgr *TransactionManager) Stop() { mgr.stop() } + +// initialize initializes the TransactionManager by loading the applied log index and determining the +// appended log index from the database. +func (mgr *TransactionManager) initialize() error { + var appliedLogIndex gitalypb.LogIndex + if err := mgr.readKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("read applied log index: %w", err) + } + + mgr.appliedLogIndex = LogIndex(appliedLogIndex.LogIndex) + + // The index of the last appended log entry is determined from the indexes of the latest entry in the log and + // the latest applied log entry. If there is a log entry, it is the latest appended log entry. If there are no + // log entries, the latest log entry must have been applied to the repository and pruned away, meaning the index + // of the last appended log entry is the same as the index if the last applied log entry. + // + // As the log indexes in the keys are encoded in big endian, the latest log entry can be found by taking + // the first key when iterating the log entry key space in reverse. + return mgr.db.View(func(txn databaseTransaction) error { + logPrefix := keyPrefixLogEntries(getRepositoryID(mgr.repository)) + + iterator := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: logPrefix}) + defer iterator.Close() + + mgr.appendedLogIndex = mgr.appliedLogIndex + + // The iterator seeks to a key that is greater than or equal than seeked key. Since we are doing a reverse + // seek, we need to add 0xff to the prefix so the first iterated key is the latest log entry. + if iterator.Seek(append(logPrefix, 0xff)); iterator.Valid() { + mgr.appendedLogIndex = LogIndex(binary.BigEndian.Uint64(bytes.TrimPrefix(iterator.Item().Key(), logPrefix))) + } + + return nil + }) +} + +// verifyReferences verifies that the references in the transaction apply on top of the already accepted +// reference changes. The old tips in the transaction are verified against the current actual tips. +// It returns the write-ahead log entry for the transaction if it was successfully verified. +func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction Transaction) (*gitalypb.LogEntry, error) { + logEntry := &gitalypb.LogEntry{} + for referenceName, tips := range transaction.ReferenceUpdates { + // 'git update-ref' doesn't ensure the loose references end up in the + // refs directory so we enforce that here. + if !strings.HasPrefix(referenceName.String(), "refs/") { + return nil, InvalidReferenceFormatError{ReferenceName: referenceName} + } + + // We'll later implement reference format verification in Gitaly. update-ref reports errors with these characters + // in a difficult to parse manner. For now, let's check these two illegal characters separately so we can return a + // proper error. + for _, illegalCharacter := range []byte{0, '\n'} { + if bytes.Contains([]byte(referenceName), []byte{illegalCharacter}) { + return nil, InvalidReferenceFormatError{ReferenceName: referenceName} + } + } + + actualOldTip, err := mgr.repository.ResolveRevision(ctx, referenceName.Revision()) + if errors.Is(err, git.ErrReferenceNotFound) { + objectHash, err := mgr.repository.ObjectHash(ctx) + if err != nil { + return nil, fmt.Errorf("object hash: %w", err) + } + + actualOldTip = objectHash.ZeroOID + } else if err != nil { + return nil, fmt.Errorf("resolve revision: %w", err) + } + + if tips.OldOID != actualOldTip { + if transaction.SkipVerificationFailures { + continue + } + + return nil, ReferenceVerificationError{ + ReferenceName: referenceName, + ExpectedOID: tips.OldOID, + ActualOID: actualOldTip, + } + } + + logEntry.ReferenceUpdates = append(logEntry.ReferenceUpdates, &gitalypb.LogEntry_ReferenceUpdate{ + ReferenceName: []byte(referenceName), + NewOid: []byte(tips.NewOID), + }) + } + + // Sort the reference updates so the reference changes are always logged in a deterministic order. + sort.Slice(logEntry.ReferenceUpdates, func(i, j int) bool { + return bytes.Compare( + logEntry.ReferenceUpdates[i].ReferenceName, + logEntry.ReferenceUpdates[j].ReferenceName, + ) == -1 + }) + + if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil { + return nil, fmt.Errorf("verify references with git: %w", err) + } + + return logEntry, nil +} + +// vefifyReferencesWithGit verifies the reference updates with git by preparing reference transaction. This ensures +// the updates will go through when they are being applied in the log. This also catches any invalid reference names +// and file/directory conflicts with Git's loose reference storage which can occur with references like +// 'refs/heads/parent' and 'refs/heads/parent/child'. +func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error { + updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates) + if err != nil { + return fmt.Errorf("prepare reference transaction: %w", err) + } + + return updater.Close() +} + +// prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing +// or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up +// if an error is returned. +func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) { + updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions()) + if err != nil { + return nil, fmt.Errorf("new: %w", err) + } + + if err := updater.Start(); err != nil { + return nil, fmt.Errorf("start: %w", err) + } + + for _, referenceUpdate := range referenceUpdates { + if err := updater.Update(git.ReferenceName(referenceUpdate.ReferenceName), git.ObjectID(referenceUpdate.NewOid), ""); err != nil { + return nil, fmt.Errorf("update %q: %w", referenceUpdate.ReferenceName, err) + } + } + + if err := updater.Prepare(); err != nil { + var errInvalidReferenceFormat updateref.ErrInvalidReferenceFormat + if errors.As(err, &errInvalidReferenceFormat) { + return nil, InvalidReferenceFormatError{ReferenceName: git.ReferenceName(errInvalidReferenceFormat.ReferenceName)} + } + + return nil, fmt.Errorf("prepare: %w", err) + } + + return updater, nil +} + +// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not +// logged nor applied later. +func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error { + nextLogIndex := mgr.appendedLogIndex + 1 + + if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil { + return fmt.Errorf("set log entry: %w", err) + } + + mgr.appendedLogIndex = nextLogIndex + + return nil +} + +// applyLogEntry reads a log entry at the given index and applies it to the repository. +func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error { + var logEntry gitalypb.LogEntry + if err := mgr.readKey(keyLogEntry(getRepositoryID(mgr.repository), logIndex), &logEntry); err != nil { + return fmt.Errorf("read log entry: %w", err) + } + + updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates) + if err != nil { + return fmt.Errorf("perpare reference transaction: %w", err) + } + + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + if err := mgr.storeAppliedLogIndex(logIndex); err != nil { + return fmt.Errorf("set applied log index: %w", err) + } + + mgr.appliedLogIndex = logIndex + + return nil +} + +// storeLogEntry stores the log entry in the repository's write-ahead log at the given index. +func (mgr *TransactionManager) storeLogEntry(index LogIndex, entry *gitalypb.LogEntry) error { + return mgr.setKey(keyLogEntry(getRepositoryID(mgr.repository), index), entry) +} + +// storeAppliedLogIndex stores the repository's applied log index in the database. +func (mgr *TransactionManager) storeAppliedLogIndex(index LogIndex) error { + return mgr.setKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), index.toProto()) +} + +// setKey marshals and stores a given protocol buffer message into the database under the given key. +func (mgr *TransactionManager) setKey(key []byte, value proto.Message) error { + marshaledValue, err := proto.Marshal(value) + if err != nil { + return fmt.Errorf("marshal value: %w", err) + } + + writeBatch := mgr.db.NewWriteBatch() + defer writeBatch.Cancel() + + if err := writeBatch.Set(key, marshaledValue); err != nil { + return fmt.Errorf("set: %w", err) + } + + return writeBatch.Flush() +} + +// readKey reads a key from the database and unmarshals its value in to the destination protocol +// buffer message. +func (mgr *TransactionManager) readKey(key []byte, destination proto.Message) error { + return mgr.db.View(func(txn databaseTransaction) error { + item, err := txn.Get(key) + if err != nil { + return fmt.Errorf("get: %w", err) + } + + return item.Value(func(value []byte) error { return proto.Unmarshal(value, destination) }) + }) +} + +// getRepositoryID returns a repository's ID. The ID should never change as it is used in the database +// keys. Gitaly does not have a permanent ID to use yet so the repository's storage name and relative +// path are used as a composite key. +func getRepositoryID(repository repository) string { + return repository.GetStorageName() + ":" + repository.GetRelativePath() +} + +// keyAppliedLogIndex returns the database key storing a repository's last applied log entry's index. +func keyAppliedLogIndex(repositoryID string) []byte { + return []byte(fmt.Sprintf("repository/%s/log/index/applied", repositoryID)) +} + +// keyLogEntry returns the database key storing a repository's log entry at a given index. +func keyLogEntry(repositoryID string, index LogIndex) []byte { + marshaledIndex := make([]byte, binary.Size(index)) + binary.BigEndian.PutUint64(marshaledIndex, uint64(index)) + return []byte(fmt.Sprintf("%s%s", keyPrefixLogEntries(repositoryID), marshaledIndex)) +} + +// keyPrefixLogEntries returns the key prefix holding repository's write-ahead log entries. +func keyPrefixLogEntries(repositoryID string) []byte { + return []byte(fmt.Sprintf("repository/%s/log/entry/", repositoryID)) +} diff --git a/internal/gitaly/transaction_manager_hook_test.go b/internal/gitaly/transaction_manager_hook_test.go new file mode 100644 index 000000000..618dbffb9 --- /dev/null +++ b/internal/gitaly/transaction_manager_hook_test.go @@ -0,0 +1,137 @@ +package gitaly + +import ( + "context" + "regexp" + "runtime" + "strings" + "testing" + + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" +) + +// hookFunc is a function that is executed at a specific point. It gets a hookContext that allows it to +// influence the execution of the test. +type hookFunc func(hookContext) + +// hookContext are the control toggels available in a hook. +type hookContext struct { + // stopManager calls the calls stops the TransactionManager. + stopManager func() +} + +// hooks are functions that get invoked at specific points of the TransactionManager Run method. They allow +// for hooking into the Run method at specific poins which would otherwise to do assertions that would otherwise +// not be possible. +type hooks struct { + // beforeReadLogEntry is invoked before a log entry is read from the database. + beforeReadLogEntry hookFunc + // beforeResolveRevision is invoked before ResolveRevision is invoked. + beforeResolveRevision hookFunc + // beforeDeferredStop is invoked before the deferred Stop is invoked in Run. + beforeDeferredStop hookFunc +} + +// installHooks installs the configured hooks into the transactionManager. +func installHooks(tb testing.TB, transactionManager *TransactionManager, database *badger.DB, repository *localrepo.Repo, hooks hooks) { + hookContext := hookContext{stopManager: transactionManager.stop} + + transactionManager.stop = func() { + programCounter, _, _, ok := runtime.Caller(2) + require.True(tb, ok) + + isDeferredStopInRun := strings.HasSuffix( + runtime.FuncForPC(programCounter).Name(), + "gitaly.(*TransactionManager).Run", + ) + + if isDeferredStopInRun && hooks.beforeDeferredStop != nil { + hooks.beforeDeferredStop(hookContext) + } + + hookContext.stopManager() + } + + transactionManager.db = databaseHook{ + database: newDatabaseAdapter(database), + hooks: hooks, + hookContext: hookContext, + } + + transactionManager.repository = repositoryHook{ + repository: repository, + hookContext: hookContext, + hooks: hooks, + } +} + +type repositoryHook struct { + repository + hookContext + hooks +} + +func (hook repositoryHook) ResolveRevision(ctx context.Context, revision git.Revision) (git.ObjectID, error) { + if hook.beforeResolveRevision != nil { + hook.hooks.beforeResolveRevision(hook.hookContext) + } + + return hook.repository.ResolveRevision(ctx, revision) +} + +type databaseHook struct { + database + hookContext + hooks +} + +func (hook databaseHook) View(handler func(databaseTransaction) error) error { + return hook.database.View(func(transaction databaseTransaction) error { + return handler(databaseTransactionHook{ + databaseTransaction: transaction, + hookContext: hook.hookContext, + hooks: hook.hooks, + }) + }) +} + +func (hook databaseHook) NewWriteBatch() writeBatch { + return writeBatchHook{writeBatch: hook.database.NewWriteBatch()} +} + +type databaseTransactionHook struct { + databaseTransaction + hookContext + hooks +} + +var regexLogEntry = regexp.MustCompile("repository/.+/log/entry/") + +func (hook databaseTransactionHook) Get(key []byte) (*badger.Item, error) { + if regexLogEntry.Match(key) { + if hook.hooks.beforeReadLogEntry != nil { + hook.hooks.beforeReadLogEntry(hook.hookContext) + } + } + + return hook.databaseTransaction.Get(key) +} + +func (hook databaseTransactionHook) NewIterator(options badger.IteratorOptions) *badger.Iterator { + return hook.databaseTransaction.NewIterator(options) +} + +type writeBatchHook struct { + writeBatch +} + +func (hook writeBatchHook) Set(key []byte, value []byte) error { + return hook.writeBatch.Set(key, value) +} + +func (hook writeBatchHook) Flush() error { return hook.writeBatch.Flush() } + +func (hook writeBatchHook) Cancel() { hook.writeBatch.Cancel() } diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go new file mode 100644 index 000000000..808fa172d --- /dev/null +++ b/internal/gitaly/transaction_manager_test.go @@ -0,0 +1,1586 @@ +package gitaly + +import ( + "context" + "encoding/hex" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" +) + +func TestTransactionManager(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + // A clean repository is setup for each test. We build a repository ahead of the tests here once to + // get deterministic commit IDs, relative path and object hash we can use to build the declarative + // test cases. + relativePath := gittest.NewRepositoryName(t) + setupRepository := func(t *testing.T) (*localrepo.Repo, git.ObjectID, git.ObjectID, git.ObjectID) { + t.Helper() + + cfg := testcfg.Build(t) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: relativePath, + }) + + rootCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) + secondCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(rootCommitOID)) + thirdCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(secondCommitOID)) + + cmdFactory, clean, err := git.NewExecCommandFactory(cfg) + require.NoError(t, err) + t.Cleanup(clean) + + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + localRepo := localrepo.New( + config.NewLocator(cfg), + cmdFactory, + catfileCache, + repo, + ) + + return localRepo, rootCommitOID, secondCommitOID, thirdCommitOID + } + + // Collect commit OIDs and the object has so we can define the test cases with them. + repo, rootCommitOID, secondCommitOID, thirdCommitOID := setupRepository(t) + objectHash, err := repo.ObjectHash(ctx) + require.NoError(t, err) + + hasher := objectHash.Hash() + _, err = hasher.Write([]byte("content does not matter")) + require.NoError(t, err) + nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil))) + require.NoError(t, err) + + type testHooks struct { + // BeforeApplyLogEntry is called before a log entry is applied to the repository. + BeforeApplyLogEntry hookFunc + // BeforeAppendLogEntry is called before a log entry is appended to the log. + BeforeAppendLogEntry hookFunc + // WaitForTransactionsWhenStopping waits for a in-flight to finish before returning + // from Run. + WaitForTransactionsWhenStopping bool + } + + // Step defines a single execution step in a test. Each test case can define multiple steps to setup exercise + // more complex behavior and to assert the state after each step. + type steps []struct { + // StopManager stops the manager in the beginning of the step. + StopManager bool + // StartManager can be used to start the manager again after stopping it. + StartManager bool + // Context is the context to use for the Propose call of the step. + Context context.Context + // Transaction is the transaction that is proposed in this step. + Transaction Transaction + // Hooks contains the hook functions that are configured on the TransactionManager. These allow + // for better synchronization. + Hooks testHooks + // ExpectedRunError is the expected error to be returned from Run from this step. + ExpectedRunError bool + // ExpectedProposeError is the error that is expected to be returned when proposing the transaction in this step. + ExpectedProposeError error + // ExpectedReferences is the expected state of references at the end of this step. + ExpectedReferences []git.Reference + // ExpectedDatabase is the expected state of the database at the end of this step. + ExpectedDatabase DatabaseState + } + + type testCase struct { + desc string + steps steps + } + + testCases := []testCase{ + { + desc: "invalid reference aborts the entire transaction", + steps: steps{ + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: "refs/heads/../main"}, + }, + }, + }, + { + desc: "continues processing after aborting due to an invalid reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: "refs/heads/../main"}, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create a file-directory reference conflict different transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: updateref.ErrFileDirectoryConflict{ + ExistingReferenceName: "refs/heads/parent", + ConflictingReferenceName: "refs/heads/parent/child", + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create a file-directory reference conflict in same transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: updateref.ErrInTransactionConflict{ + FirstReferenceName: "refs/heads/parent", + SecondReferenceName: "refs/heads/parent/child", + }, + }, + }, + }, + { + desc: "file-directory conflict aborts the transaction with verification failures skipped", + steps: steps{ + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: updateref.ErrInTransactionConflict{ + FirstReferenceName: "refs/heads/parent", + SecondReferenceName: "refs/heads/parent/child", + }, + }, + }, + }, + { + desc: "delete file-directory conflict in different transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent/child", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent/child"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: updateref.ErrFileDirectoryConflict{ + ExistingReferenceName: "refs/heads/parent/child", + ConflictingReferenceName: "refs/heads/parent", + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent/child", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent/child"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete file-directory conflict in same transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: updateref.ErrInTransactionConflict{ + FirstReferenceName: "refs/heads/parent", + SecondReferenceName: "refs/heads/parent/child", + }, + }, + }, + }, + { + desc: "create a branch to a non-commit object", + steps: steps{ + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + // The error should abort the entire transaction. + "refs/heads/branch-1": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/branch-2": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, + }, + }, + ExpectedProposeError: updateref.NonCommitObjectError{ + ReferenceName: "refs/heads/branch-2", + ObjectID: objectHash.EmptyTreeOID.String(), + }, + }, + }, + }, + { + desc: "create a tag to a non-commit object", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/tags/v1.0.0": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/tags/v1.0.0", Target: objectHash.EmptyTreeOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/tags/v1.0.0"), + NewOid: []byte(objectHash.EmptyTreeOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create a reference to non-existent object", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: nonExistentOID}, + }, + }, + ExpectedProposeError: updateref.NonExistentObjectError{ + ReferenceName: "refs/heads/main", + ObjectID: nonExistentOID.String(), + }, + }, + }, + }, + { + desc: "create reference ignoring verification failure", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: secondCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create reference that already exists", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: objectHash.ZeroOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create reference no-op", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: objectHash.ZeroOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update reference ignoring verification failures", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: thirdCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(thirdCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update reference with incorrect old tip", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update non-existent reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + }, + }, + { + desc: "update reference no-op", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(objectHash.ZeroOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete reference ignoring verification failures", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(objectHash.ZeroOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete reference with incorrect old tip", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete non-existent reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: rootCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + }, + }, + { + desc: "delete reference no-op", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(objectHash.ZeroOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "continues processing after reference verification failure", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: rootCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "continues processing after a restart", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + StopManager: true, + StartManager: true, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "continues processing after restarting after a reference verification failure", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: rootCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + { + StopManager: true, + StartManager: true, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "recovers from the write-ahead log on start up", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + hookCtx.stopManager() + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedRunError: true, + ExpectedDatabase: DatabaseState{ + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "reference verification fails after recovering logged writes", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + hookCtx.stopManager() + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedRunError: true, + ExpectedDatabase: DatabaseState{ + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "propose returns if context is canceled before admission", + steps: steps{ + { + Context: func() context.Context { + ctx, cancel := context.WithCancel(ctx) + cancel() + return ctx + }(), + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: context.Canceled, + }, + }, + }, + { + desc: "propose returns if transaction processing stops before admission", + steps: steps{ + { + StopManager: true, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + }, + }, + }, + func() testCase { + ctx, cancel := context.WithCancel(ctx) + return testCase{ + desc: "propose returns if context is canceled after admission", + steps: steps{ + { + Context: ctx, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + // Cancel the context used in Propose + cancel() + }, + }, + ExpectedProposeError: context.Canceled, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + } + }(), + { + desc: "propose returns if transaction processing stops before transaction acceptance", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.stopManager() }, + // This ensures we are testing the context cancellation errors being unwrapped properly + // to an ErrTransactionProcessingStopped instead of hitting the general case when + // runDone is closed. + WaitForTransactionsWhenStopping: true, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + }, + }, + }, + { + desc: "propose returns if transaction processing stops after transaction acceptance", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + hookCtx.stopManager() + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedRunError: true, + ExpectedDatabase: DatabaseState{ + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + } + + type invalidReferenceTestCase struct { + desc string + referenceName git.ReferenceName + invalidReference git.ReferenceName + } + + appendInvalidReferenceTestCase := func(tc invalidReferenceTestCase) { + invalidReference := tc.invalidReference + if invalidReference == "" { + invalidReference = tc.referenceName + } + + testCases = append(testCases, testCase{ + desc: fmt.Sprintf("invalid reference %s", tc.desc), + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + tc.referenceName: {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: invalidReference}, + }, + }, + }) + } + + // Generate test cases for the reference format rules according to https://git-scm.com/docs/git-check-ref-format. + // This is to ensure the references are correctly validated prior to logging so they are guaranteed to apply later. + for _, tc := range []invalidReferenceTestCase{ + // 1. They can include slash / for hierarchical (directory) grouping, but no slash-separated + // component can begin with a dot . or end with the sequence .lock. + {"starting with a period", ".refs/heads/main", ""}, + {"subcomponent starting with a period", "refs/heads/.main", ""}, + {"ending in .lock", "refs/heads/main.lock", ""}, + {"subcomponent ending in .lock", "refs/heads/main.lock/main", ""}, + // 2. They must contain at least one /. This enforces the presence of a category like heads/, + // tags/ etc. but the actual names are not restricted. + {"without a /", "one-level", ""}, + {"with refs without a /", "refs", ""}, + // We restrict this further by requiring a 'refs/' prefix to ensure loose references only end up + // in the 'refs/' folder. + {"without refs/ prefix ", "nonrefs/main", ""}, + // 3. They cannot have two consecutive dots .. anywhere. + {"containing two consecutive dots", "refs/heads/../main", ""}, + // 4. They cannot have ASCII control characters ... (\177 DEL), space, tilde ~, caret ^, or colon : anywhere. + // + // Tests for control characters < \040 generated further down. + {"containing DEL", "refs/heads/ma\177in", "refs/heads/ma?in"}, + {"containing space", "refs/heads/ma in", ""}, + {"containing ~", "refs/heads/ma~in", ""}, + {"containing ^", "refs/heads/ma^in", ""}, + {"containing :", "refs/heads/ma:in", ""}, + // 5. They cannot have question-mark ?, asterisk *, or open bracket [ anywhere. + {"containing ?", "refs/heads/ma?in", ""}, + {"containing *", "refs/heads/ma*in", ""}, + {"containing [", "refs/heads/ma[in", ""}, + // 6. They cannot begin or end with a slash / or contain multiple consecutive slashes + {"begins with /", "/refs/heads/main", ""}, + {"ends with /", "refs/heads/main/", ""}, + {"contains consecutive /", "refs/heads//main", ""}, + // 7. They cannot end with a dot. + {"ending in a dot", "refs/heads/main.", ""}, + // 8. They cannot contain a sequence @{. + {"invalid reference contains @{", "refs/heads/m@{n", ""}, + // 9. They cannot be the single character @. + {"is a single character @", "@", ""}, + // 10. They cannot contain a \. + {`containing \`, `refs/heads\main`, `refs/heads\main`}, + } { + appendInvalidReferenceTestCase(tc) + } + + // Rule 4. They cannot have ASCII control characters i.e. bytes whose values are lower than \040, + for i := byte(0); i < '\040'; i++ { + invalidReference := fmt.Sprintf("refs/heads/ma%sin", []byte{i}) + + // For now, the reference format is checked by 'git update-ref'. Most of the control characters + // are substituted by a '?' by Git when reporting the error. + // + // '\t' is reported without substitution. '\n' and '\000' are checked by the TransactionManager + // separately so it can report a better error than what Git produces. + // + // This is temporarily more complicated than it needs to be. Later iteration will move reference + // format checking into Gitaly at which point we can return proper errors without substituting + // the problematic characters. + var expectedSubstitute string + switch i { + case '\000', '\n', '\t': + expectedSubstitute = string(i) + default: + expectedSubstitute = "?" + } + + appendInvalidReferenceTestCase(invalidReferenceTestCase{ + desc: fmt.Sprintf(`containing ASCII control character %d`, i), + referenceName: git.ReferenceName(invalidReference), + invalidReference: git.ReferenceName(fmt.Sprintf("refs/heads/ma%sin", expectedSubstitute)), + }) + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + // Setup the repository with the exact same state as what was used to build the test cases. + repository, _, _, _ := setupRepository(t) + + database, err := OpenDatabase(t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, database) + + var ( + // managerRunning tracks whether the manager is running or stopped. + managerRunning bool + // transactionManager is the current TransactionManager instance. + transactionManager *TransactionManager + // managerErr is used for synchronizing manager stopping and returning + // the error from Run. + managerErr chan error + // inflightTransactions tracks the number of on going propose calls. It is used to synchronize + // the database hooks with propose calls. + inflightTransactions sync.WaitGroup + ) + + // stopManager stops the manager. It waits until the manager's Run method has exited. + stopManager := func() { + t.Helper() + + transactionManager.Stop() + managerRunning, err = checkManagerError(t, managerErr, transactionManager) + require.NoError(t, err) + } + + // startManager starts fresh manager and applies hooks into it. + startManager := func(testHooks testHooks) { + t.Helper() + + require.False(t, managerRunning, "manager started while it was already running") + managerRunning = true + managerErr = make(chan error) + + transactionManager = NewTransactionManager(database, repository) + installHooks(t, transactionManager, database, repository, hooks{ + beforeResolveRevision: testHooks.BeforeAppendLogEntry, + beforeReadLogEntry: testHooks.BeforeApplyLogEntry, + beforeDeferredStop: func(hookContext) { + if testHooks.WaitForTransactionsWhenStopping { + inflightTransactions.Wait() + } + }, + }) + + go func() { managerErr <- transactionManager.Run() }() + } + + // Stop the manager if it is running at the end of the test. + defer stopManager() + for _, step := range tc.steps { + // Ensure every step starts with the manager running. + if !managerRunning { + startManager(step.Hooks) + } + + if step.StopManager { + require.True(t, managerRunning, "manager stopped while it was already stopped") + stopManager() + } + + if step.StartManager { + startManager(step.Hooks) + } + + func() { + inflightTransactions.Add(1) + defer inflightTransactions.Done() + + proposeCtx := ctx + if step.Context != nil { + proposeCtx = step.Context + } + + require.ErrorIs(t, transactionManager.Propose(proposeCtx, step.Transaction), step.ExpectedProposeError) + }() + + if managerRunning, err = checkManagerError(t, managerErr, transactionManager); step.ExpectedRunError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + RequireReferences(t, ctx, repository, step.ExpectedReferences) + RequireDatabase(t, ctx, database, step.ExpectedDatabase) + } + }) + } +} + +func checkManagerError(t *testing.T, managerErr chan error, mgr *TransactionManager) (bool, error) { + t.Helper() + + select { + case err, ok := <-managerErr: + if ok { + close(managerErr) + } + + // managerErr returns the possible error if manager has already stopped. + return false, err + case mgr.admissionQueue <- transactionFuture{ + transaction: Transaction{ReferenceUpdates: ReferenceUpdates{"sentinel": {}}}, + result: make(resultChannel, 1), + }: + // If the error channel doesn't receive, we don't know whether it is because the manager is still running + // or we are still waiting for it to return. We test whether the manager is running or not here by queueing a + // a proposal that will error. If the manager processes it, we know it is still running. + return true, nil + } +} diff --git a/internal/testhelper/leakage.go b/internal/testhelper/leakage.go index ffec6c7ae..5dc9cae20 100644 --- a/internal/testhelper/leakage.go +++ b/internal/testhelper/leakage.go @@ -16,6 +16,10 @@ import ( // mustHaveNoGoroutines panics if it finds any Goroutines running. func mustHaveNoGoroutines() { if err := goleak.Find( + // BadgerDB depends on Ristretto which uses glog for logging. glog initializes + // on import a log flushing goroutine that keeps running in the background. + // Ignore this goroutine as there is no way to stop it. + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), // opencensus has a "defaultWorker" which is started by the package's // `init()` function. There is no way to stop this worker, so it will leak // whenever we import the package. -- cgit v1.2.3