diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-01-17 13:48:18 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-01-17 13:48:18 +0300 |
commit | 7e80ece2ebfd82776285c83362e6874dacb5b26a (patch) | |
tree | 9743724ec61b44c8e0c768eaeea90603f3ab4ae2 /internal | |
parent | 60e00c9b19ab28605c0b80931683feddcc62d971 (diff) | |
parent | 8a29ef51192cc57770dcd7966d9af8173eb7ff4d (diff) |
Merge branch 'smh-log-worker' into 'master'
Implement basic transaction management with write-ahead logging
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5020
Merged-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Diffstat (limited to 'internal')
-rw-r--r-- | internal/gitaly/database.go | 59 | ||||
-rw-r--r-- | internal/gitaly/testhelper_test.go | 72 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager.go | 539 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager_hook_test.go | 137 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager_test.go | 1586 | ||||
-rw-r--r-- | internal/testhelper/leakage.go | 4 |
6 files changed, 2397 insertions, 0 deletions
diff --git a/internal/gitaly/database.go b/internal/gitaly/database.go new file mode 100644 index 000000000..2015de710 --- /dev/null +++ b/internal/gitaly/database.go @@ -0,0 +1,59 @@ +package gitaly + +import ( + "github.com/dgraph-io/badger/v3" +) + +// OpenDatabase opens a new database handle to a database at the given path. +func OpenDatabase(databasePath string) (*badger.DB, error) { + dbOptions := badger.DefaultOptions(databasePath) + // Enable SyncWrites to ensure all writes are persisted to disk before considering + // them committed. + dbOptions.SyncWrites = true + // Badger by default logs fairly verbose statistics when opening a database. Disable the + // logging for now. + dbOptions.Logger = nil + + return badger.Open(dbOptions) +} + +// databaseAdapter adapts a *badger.DB to the internal database interface used by the hooks in tests. +type databaseAdapter struct{ *badger.DB } + +// newDatabaseAdapter adapts a *badger.DB to conform to the internal database interface used for +// hooking into during testing. +func newDatabaseAdapter(db *badger.DB) database { + return databaseAdapter{DB: db} +} + +// NewWriteBatch calls badger.*DB.NewWriteBatch. Refer to Badger's documentation for details. +func (db databaseAdapter) NewWriteBatch() writeBatch { + return db.DB.NewWriteBatch() +} + +// NewWriteBatch calls badger.*DB.View. Refer to Badger's documentation for details. +func (db databaseAdapter) View(handler func(databaseTransaction) error) error { + return db.DB.View(func(txn *badger.Txn) error { return handler(txn) }) +} + +// database is the Badger.DB interface used by TransactionManager. Refer to Badger's documentation +// for details. +type database interface { + NewWriteBatch() writeBatch + View(func(databaseTransaction) error) error +} + +// writeBatch is the interface of Badger.WriteBatch used by TransactionManager. Refer to Badger's +// documentation for details. +type writeBatch interface { + Set([]byte, []byte) error + Flush() error + Cancel() +} + +// databaseTransaction is the interface of *Badger.Txn used by TransactionManager. Refer to Badger's +// documentation for details +type databaseTransaction interface { + Get([]byte) (*badger.Item, error) + NewIterator(badger.IteratorOptions) *badger.Iterator +} diff --git a/internal/gitaly/testhelper_test.go b/internal/gitaly/testhelper_test.go new file mode 100644 index 000000000..9f194318d --- /dev/null +++ b/internal/gitaly/testhelper_test.go @@ -0,0 +1,72 @@ +package gitaly + +import ( + "context" + "fmt" + "reflect" + "testing" + + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "google.golang.org/protobuf/proto" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} + +// DatabaseState describes the expected state of the key-value store. The keys in the map are the expected keys +// in the database and the values are the expected unmarshaled values. +type DatabaseState map[string]proto.Message + +// RequireDatabase asserts the actual database state matches the expected database state. The actual values in the +// database are unmarshaled to the same type the values have in the expected database state. +func RequireDatabase(tb testing.TB, ctx context.Context, database *badger.DB, expectedState DatabaseState) { + tb.Helper() + + if expectedState == nil { + expectedState = DatabaseState{} + } + + actualState := DatabaseState{} + unexpectedKeys := []string{} + require.NoError(tb, database.View(func(txn *badger.Txn) error { + iterator := txn.NewIterator(badger.DefaultIteratorOptions) + defer iterator.Close() + + for iterator.Rewind(); iterator.Valid(); iterator.Next() { + key := iterator.Item().Key() + expectedValue, ok := expectedState[string(key)] + if !ok { + // Print the keys out escaped as otherwise the non-printing characters are not visible in the assertion failure. + unexpectedKeys = append(unexpectedKeys, fmt.Sprintf("%q", key)) + continue + } + + require.NoError(tb, iterator.Item().Value(func(value []byte) error { + // Unmarshal the actual value to the same type as the expected value. + actualValue := reflect.New(reflect.TypeOf(expectedValue).Elem()).Interface().(proto.Message) + require.NoError(tb, proto.Unmarshal(value, actualValue)) + actualState[string(key)] = actualValue + return nil + })) + } + + return nil + })) + + require.Empty(tb, unexpectedKeys, "database contains unexpected keys") + testhelper.ProtoEqual(tb, expectedState, actualState) +} + +// RequireReferences asserts that the actual state of the references in the repository match the expected. +func RequireReferences(tb testing.TB, ctx context.Context, repo *localrepo.Repo, expectedReferences []git.Reference) { + tb.Helper() + + actualReferences, err := repo.GetReferences(ctx) + require.NoError(tb, err) + require.ElementsMatch(tb, expectedReferences, actualReferences) +} diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go new file mode 100644 index 000000000..c9c89bf81 --- /dev/null +++ b/internal/gitaly/transaction_manager.go @@ -0,0 +1,539 @@ +package gitaly + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "sort" + "strings" + + "github.com/dgraph-io/badger/v3" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/protobuf/proto" +) + +// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. +var ErrTransactionProcessingStopped = errors.New("transaction processing stopped") + +// InvalidReferenceFormatError is returned when a reference name was invalid. +type InvalidReferenceFormatError struct { + // ReferenceName is the reference with invalid format. + ReferenceName git.ReferenceName +} + +// Error returns the formatted error string. +func (err InvalidReferenceFormatError) Error() string { + return fmt.Sprintf("invalid reference format: %q", err.ReferenceName) +} + +// ReferenceVerificationError is returned when a reference's old OID did not match the expected. +type ReferenceVerificationError struct { + // ReferenceName is the name of the reference that failed verification. + ReferenceName git.ReferenceName + // ExpectedOID is the OID the reference was expected to point to. + ExpectedOID git.ObjectID + // ActualOID is the OID the reference actually pointed to. + ActualOID git.ObjectID +} + +// Error returns the formatted error string. +func (err ReferenceVerificationError) Error() string { + return fmt.Sprintf("expected %q to point to %q but it pointed to %q", err.ReferenceName, err.ExpectedOID, err.ActualOID) +} + +// LogIndex points to a specific position in a repository's write-ahead log. +type LogIndex uint64 + +// toProto returns the protobuf representation of LogIndex for serialization purposes. +func (index LogIndex) toProto() *gitalypb.LogIndex { + return &gitalypb.LogIndex{LogIndex: uint64(index)} +} + +// ReferenceUpdate describes the state of a reference's old and new tip in an update. +type ReferenceUpdate struct { + // OldOID is the old OID the reference is expected to point to prior to updating it. + // If the reference does not point to the old value, the reference verification fails. + OldOID git.ObjectID + // NewOID is the new desired OID to point the reference to. + NewOID git.ObjectID +} + +// ReferenceUpdates contains references to update. Reference name is used as the key and the value +// is the expected old tip and the desired new tip. +type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate + +// Transaction is a unit-of-work that contains reference changes to perform on the repository. +type Transaction struct { + // SkipVerificationFailures causes references updates that failed the verification step to be + // dropped from the transaction. By default, any verification failures abort the entire transaction. + // + // The default behavior maps to the `--atomic` flag of `git push`. When this is set, the behavior matches + // what happens git's behavior without the flag. + SkipVerificationFailures bool + // ReferenceUpdates contains the reference updates to be performed. + ReferenceUpdates +} + +// TransactionManager is responsible for transaction management of a single repository. Each repository has +// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from +// the admissionQueue. Each admitted write is processed in three steps: +// +// 1. The references being updated are verified by ensuring the expected old tips match what the references +// actually point to prior to update. The entire transaction is by default aborted if a single reference +// fails the verification step. The reference verification behavior can be controlled on a per-transaction +// level by setting: +// - The reference verification failures can be ignored instead of aborting the entire transaction. +// If done, the references that failed verification are dropped from the transaction but the updates +// that passed verification are still performed. +// - The reference verification may also be skipped if the write is force updating references. If +// done, the current state of the references is ignored and they are directly updated to point +// to the new tips. +// 2. The transaction is appended to the write-ahead log. Once the write has been logged, it is effectively +// committed and will be applied to the repository even after restarting. +// 3. The transaction is applied from the write-ahead log to the repository by actually performing the reference +// changes. +// +// The goroutine that issued the transaction is waiting for the result while these steps are being performed. As +// there is no transaction control for readers yet, the issuer is only notified of a successful write after the +// write has been applied to the repository. +// +// TransactionManager recovers transactions after interruptions by applying the write-ahead logged transactions to +// the repository on start up. +// +// TransactionManager maintains the write-ahead log in a key-value store. It maintains the following key spaces: +// - `repository/<repository_id:string>/log/index/applied` +// - This key stores the index of the log entry that has been applied to the repository. This allows for +// determining how far a repository is in processing the log and which log entries need to be applied +// after starting up. Repository starts from log index 0 if there are no log entries recorded to have +// been applied. +// +// - `repository/<repository_id:string>/log/entry/<log_index:uint64>` +// - These keys hold the actual write-ahead log entries. A repository's first log entry starts at index 1 +// and the log index keeps monotonically increasing from there on without gaps. The write-ahead log +// entries are processed in ascending order. +// +// The values in the database are marshaled protocol buffer messages. Numbers in the keys are encoded as big +// endian to preserve the sort order of the numbers also in lexicographical order. +type TransactionManager struct { + // ctx is the context used for all operations. + ctx context.Context + // stop cancels ctx and stops the transaction processing. + stop context.CancelFunc + + // stopCalled is closed when Stop is called. It unblock transactions that are waiting to be admitted. + stopCalled <-chan struct{} + // runDone is closed when Run returns. It unblocks transactions that are waiting for a result after + // being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly + // releases awaiters when the transactions processing is stopped. + runDone chan struct{} + + // repository is the repository this TransactionManager is acting on. + repository repository + // db is the handle to the key-value store used for storing the write-ahead log related state. + db database + // admissionQueue is where the incoming writes are waiting to be admitted to the transaction + // manager. + admissionQueue chan transactionFuture + + // appendedLogIndex holds the index of the last log entry appended to the log. + appendedLogIndex LogIndex + // appliedLogIndex holds the index of the last log entry applied to the repository + appliedLogIndex LogIndex +} + +// repository is the localrepo interface used by TransactionManager. +type repository interface { + git.RepositoryExecutor + ResolveRevision(context.Context, git.Revision) (git.ObjectID, error) +} + +// NewTransactionManager returns a new TransactionManager for the given repository. +func NewTransactionManager(db *badger.DB, repository *localrepo.Repo) *TransactionManager { + ctx, cancel := context.WithCancel(context.Background()) + return &TransactionManager{ + ctx: ctx, + stopCalled: ctx.Done(), + runDone: make(chan struct{}), + stop: cancel, + repository: repository, + db: newDatabaseAdapter(db), + admissionQueue: make(chan transactionFuture), + } +} + +// resultChannel represents a future that will yield the result of a transaction once its +// outcome has been decided. +type resultChannel chan error + +// transactionFuture holds a transaction and the resultChannel the transaction queuer is waiting +// for the result on. +type transactionFuture struct { + transaction Transaction + result resultChannel +} + +// Propose queues a transaction for the TransactionManager to process. Propose returns once the transaction +// has been successfully applied to the repository. +// +// Transaction is not committed if any of the following errors is returned: +// - InvalidReferenceFormatError is returned when attempting to update a reference with an invalid name. +// - ReferenceVerificationError is returned when the reference does not point to the expected old tip. +// +// Transaction may or may not be committed if any of the following errors is returned: +// - ErrTransactionProcessing stopped is returned when the TransactionManager is closing and stops processing +// transactions. +// - Unexpected error occurs. +func (mgr *TransactionManager) Propose(ctx context.Context, transaction Transaction) error { + queuedTransaction := transactionFuture{transaction: transaction, result: make(resultChannel, 1)} + + select { + case mgr.admissionQueue <- queuedTransaction: + select { + case err := <-queuedTransaction.result: + return unwrapExpectedError(err) + case <-ctx.Done(): + return ctx.Err() + case <-mgr.runDone: + return ErrTransactionProcessingStopped + } + case <-ctx.Done(): + return ctx.Err() + case <-mgr.stopCalled: + return ErrTransactionProcessingStopped + } +} + +// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller. +func unwrapExpectedError(err error) error { + // The manager controls its own execution context and it is canceled only when Stop is called. + // Any context.Canceled errors returned are thus from shutting down so we report that here. + if errors.Is(err, context.Canceled) { + return ErrTransactionProcessingStopped + } + + return err +} + +// Run starts the transaction processing. On start up Run loads the indexes of the last appended and applied +// log entries from the database. It will then apply any transactions that have been logged but not applied +// to the repository. Once the recovery is completed, Run starts processing new transactions by verifying the +// references, logging the transaction and finally applying it to the repository. The transactions are acknowledged +// once they've been applied to the repository. +// +// Run keeps running until Stop is called or it encounters a fatal error. All active Propose calls return +// ErrTransactionProcessingStopped when Run returns. +func (mgr *TransactionManager) Run() error { + // Defer the Stop in order to release all on-going Propose calls in case of error. + defer close(mgr.runDone) + defer mgr.Stop() + + if err := mgr.initialize(); err != nil { + return fmt.Errorf("initialize: %w", err) + } + + // awaitingTransactions contains transactions waiting for their log entry to be applied to + // the repository. It's keyed by the log index the transaction is waiting to be applied and the + // value is the resultChannel that is waiting the result. + awaitingTransactions := make(map[LogIndex]resultChannel) + + for { + if mgr.appliedLogIndex < mgr.appendedLogIndex { + logIndex := mgr.appliedLogIndex + 1 + + if err := mgr.applyLogEntry(mgr.ctx, logIndex); err != nil { + return fmt.Errorf("apply log entry: %w", err) + } + + // There is no awaiter for a transaction if the transaction manager is recovering + // transactions from the log after starting up. + if resultChan, ok := awaitingTransactions[logIndex]; ok { + resultChan <- nil + delete(awaitingTransactions, logIndex) + } + + continue + } + + var transaction transactionFuture + select { + case transaction = <-mgr.admissionQueue: + case <-mgr.ctx.Done(): + } + + // Return if the manager was stopped. The select is indeterministic so this guarantees + // the manager stops the processing even if there are transactions in the queue. + if mgr.ctx.Err() != nil { + return nil + } + + if err := func() error { + logEntry, err := mgr.verifyReferences(mgr.ctx, transaction.transaction) + if err != nil { + return fmt.Errorf("verify references: %w", err) + } + + return mgr.appendLogEntry(logEntry) + }(); err != nil { + transaction.result <- err + continue + } + + awaitingTransactions[mgr.appendedLogIndex] = transaction.result + } +} + +// Stop stops the transaction processing causing Run to return. +func (mgr *TransactionManager) Stop() { mgr.stop() } + +// initialize initializes the TransactionManager by loading the applied log index and determining the +// appended log index from the database. +func (mgr *TransactionManager) initialize() error { + var appliedLogIndex gitalypb.LogIndex + if err := mgr.readKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("read applied log index: %w", err) + } + + mgr.appliedLogIndex = LogIndex(appliedLogIndex.LogIndex) + + // The index of the last appended log entry is determined from the indexes of the latest entry in the log and + // the latest applied log entry. If there is a log entry, it is the latest appended log entry. If there are no + // log entries, the latest log entry must have been applied to the repository and pruned away, meaning the index + // of the last appended log entry is the same as the index if the last applied log entry. + // + // As the log indexes in the keys are encoded in big endian, the latest log entry can be found by taking + // the first key when iterating the log entry key space in reverse. + return mgr.db.View(func(txn databaseTransaction) error { + logPrefix := keyPrefixLogEntries(getRepositoryID(mgr.repository)) + + iterator := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: logPrefix}) + defer iterator.Close() + + mgr.appendedLogIndex = mgr.appliedLogIndex + + // The iterator seeks to a key that is greater than or equal than seeked key. Since we are doing a reverse + // seek, we need to add 0xff to the prefix so the first iterated key is the latest log entry. + if iterator.Seek(append(logPrefix, 0xff)); iterator.Valid() { + mgr.appendedLogIndex = LogIndex(binary.BigEndian.Uint64(bytes.TrimPrefix(iterator.Item().Key(), logPrefix))) + } + + return nil + }) +} + +// verifyReferences verifies that the references in the transaction apply on top of the already accepted +// reference changes. The old tips in the transaction are verified against the current actual tips. +// It returns the write-ahead log entry for the transaction if it was successfully verified. +func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction Transaction) (*gitalypb.LogEntry, error) { + logEntry := &gitalypb.LogEntry{} + for referenceName, tips := range transaction.ReferenceUpdates { + // 'git update-ref' doesn't ensure the loose references end up in the + // refs directory so we enforce that here. + if !strings.HasPrefix(referenceName.String(), "refs/") { + return nil, InvalidReferenceFormatError{ReferenceName: referenceName} + } + + // We'll later implement reference format verification in Gitaly. update-ref reports errors with these characters + // in a difficult to parse manner. For now, let's check these two illegal characters separately so we can return a + // proper error. + for _, illegalCharacter := range []byte{0, '\n'} { + if bytes.Contains([]byte(referenceName), []byte{illegalCharacter}) { + return nil, InvalidReferenceFormatError{ReferenceName: referenceName} + } + } + + actualOldTip, err := mgr.repository.ResolveRevision(ctx, referenceName.Revision()) + if errors.Is(err, git.ErrReferenceNotFound) { + objectHash, err := mgr.repository.ObjectHash(ctx) + if err != nil { + return nil, fmt.Errorf("object hash: %w", err) + } + + actualOldTip = objectHash.ZeroOID + } else if err != nil { + return nil, fmt.Errorf("resolve revision: %w", err) + } + + if tips.OldOID != actualOldTip { + if transaction.SkipVerificationFailures { + continue + } + + return nil, ReferenceVerificationError{ + ReferenceName: referenceName, + ExpectedOID: tips.OldOID, + ActualOID: actualOldTip, + } + } + + logEntry.ReferenceUpdates = append(logEntry.ReferenceUpdates, &gitalypb.LogEntry_ReferenceUpdate{ + ReferenceName: []byte(referenceName), + NewOid: []byte(tips.NewOID), + }) + } + + // Sort the reference updates so the reference changes are always logged in a deterministic order. + sort.Slice(logEntry.ReferenceUpdates, func(i, j int) bool { + return bytes.Compare( + logEntry.ReferenceUpdates[i].ReferenceName, + logEntry.ReferenceUpdates[j].ReferenceName, + ) == -1 + }) + + if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil { + return nil, fmt.Errorf("verify references with git: %w", err) + } + + return logEntry, nil +} + +// vefifyReferencesWithGit verifies the reference updates with git by preparing reference transaction. This ensures +// the updates will go through when they are being applied in the log. This also catches any invalid reference names +// and file/directory conflicts with Git's loose reference storage which can occur with references like +// 'refs/heads/parent' and 'refs/heads/parent/child'. +func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error { + updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates) + if err != nil { + return fmt.Errorf("prepare reference transaction: %w", err) + } + + return updater.Close() +} + +// prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing +// or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up +// if an error is returned. +func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) { + updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions()) + if err != nil { + return nil, fmt.Errorf("new: %w", err) + } + + if err := updater.Start(); err != nil { + return nil, fmt.Errorf("start: %w", err) + } + + for _, referenceUpdate := range referenceUpdates { + if err := updater.Update(git.ReferenceName(referenceUpdate.ReferenceName), git.ObjectID(referenceUpdate.NewOid), ""); err != nil { + return nil, fmt.Errorf("update %q: %w", referenceUpdate.ReferenceName, err) + } + } + + if err := updater.Prepare(); err != nil { + var errInvalidReferenceFormat updateref.ErrInvalidReferenceFormat + if errors.As(err, &errInvalidReferenceFormat) { + return nil, InvalidReferenceFormatError{ReferenceName: git.ReferenceName(errInvalidReferenceFormat.ReferenceName)} + } + + return nil, fmt.Errorf("prepare: %w", err) + } + + return updater, nil +} + +// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not +// logged nor applied later. +func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error { + nextLogIndex := mgr.appendedLogIndex + 1 + + if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil { + return fmt.Errorf("set log entry: %w", err) + } + + mgr.appendedLogIndex = nextLogIndex + + return nil +} + +// applyLogEntry reads a log entry at the given index and applies it to the repository. +func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error { + var logEntry gitalypb.LogEntry + if err := mgr.readKey(keyLogEntry(getRepositoryID(mgr.repository), logIndex), &logEntry); err != nil { + return fmt.Errorf("read log entry: %w", err) + } + + updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates) + if err != nil { + return fmt.Errorf("perpare reference transaction: %w", err) + } + + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + if err := mgr.storeAppliedLogIndex(logIndex); err != nil { + return fmt.Errorf("set applied log index: %w", err) + } + + mgr.appliedLogIndex = logIndex + + return nil +} + +// storeLogEntry stores the log entry in the repository's write-ahead log at the given index. +func (mgr *TransactionManager) storeLogEntry(index LogIndex, entry *gitalypb.LogEntry) error { + return mgr.setKey(keyLogEntry(getRepositoryID(mgr.repository), index), entry) +} + +// storeAppliedLogIndex stores the repository's applied log index in the database. +func (mgr *TransactionManager) storeAppliedLogIndex(index LogIndex) error { + return mgr.setKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), index.toProto()) +} + +// setKey marshals and stores a given protocol buffer message into the database under the given key. +func (mgr *TransactionManager) setKey(key []byte, value proto.Message) error { + marshaledValue, err := proto.Marshal(value) + if err != nil { + return fmt.Errorf("marshal value: %w", err) + } + + writeBatch := mgr.db.NewWriteBatch() + defer writeBatch.Cancel() + + if err := writeBatch.Set(key, marshaledValue); err != nil { + return fmt.Errorf("set: %w", err) + } + + return writeBatch.Flush() +} + +// readKey reads a key from the database and unmarshals its value in to the destination protocol +// buffer message. +func (mgr *TransactionManager) readKey(key []byte, destination proto.Message) error { + return mgr.db.View(func(txn databaseTransaction) error { + item, err := txn.Get(key) + if err != nil { + return fmt.Errorf("get: %w", err) + } + + return item.Value(func(value []byte) error { return proto.Unmarshal(value, destination) }) + }) +} + +// getRepositoryID returns a repository's ID. The ID should never change as it is used in the database +// keys. Gitaly does not have a permanent ID to use yet so the repository's storage name and relative +// path are used as a composite key. +func getRepositoryID(repository repository) string { + return repository.GetStorageName() + ":" + repository.GetRelativePath() +} + +// keyAppliedLogIndex returns the database key storing a repository's last applied log entry's index. +func keyAppliedLogIndex(repositoryID string) []byte { + return []byte(fmt.Sprintf("repository/%s/log/index/applied", repositoryID)) +} + +// keyLogEntry returns the database key storing a repository's log entry at a given index. +func keyLogEntry(repositoryID string, index LogIndex) []byte { + marshaledIndex := make([]byte, binary.Size(index)) + binary.BigEndian.PutUint64(marshaledIndex, uint64(index)) + return []byte(fmt.Sprintf("%s%s", keyPrefixLogEntries(repositoryID), marshaledIndex)) +} + +// keyPrefixLogEntries returns the key prefix holding repository's write-ahead log entries. +func keyPrefixLogEntries(repositoryID string) []byte { + return []byte(fmt.Sprintf("repository/%s/log/entry/", repositoryID)) +} diff --git a/internal/gitaly/transaction_manager_hook_test.go b/internal/gitaly/transaction_manager_hook_test.go new file mode 100644 index 000000000..618dbffb9 --- /dev/null +++ b/internal/gitaly/transaction_manager_hook_test.go @@ -0,0 +1,137 @@ +package gitaly + +import ( + "context" + "regexp" + "runtime" + "strings" + "testing" + + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" +) + +// hookFunc is a function that is executed at a specific point. It gets a hookContext that allows it to +// influence the execution of the test. +type hookFunc func(hookContext) + +// hookContext are the control toggels available in a hook. +type hookContext struct { + // stopManager calls the calls stops the TransactionManager. + stopManager func() +} + +// hooks are functions that get invoked at specific points of the TransactionManager Run method. They allow +// for hooking into the Run method at specific poins which would otherwise to do assertions that would otherwise +// not be possible. +type hooks struct { + // beforeReadLogEntry is invoked before a log entry is read from the database. + beforeReadLogEntry hookFunc + // beforeResolveRevision is invoked before ResolveRevision is invoked. + beforeResolveRevision hookFunc + // beforeDeferredStop is invoked before the deferred Stop is invoked in Run. + beforeDeferredStop hookFunc +} + +// installHooks installs the configured hooks into the transactionManager. +func installHooks(tb testing.TB, transactionManager *TransactionManager, database *badger.DB, repository *localrepo.Repo, hooks hooks) { + hookContext := hookContext{stopManager: transactionManager.stop} + + transactionManager.stop = func() { + programCounter, _, _, ok := runtime.Caller(2) + require.True(tb, ok) + + isDeferredStopInRun := strings.HasSuffix( + runtime.FuncForPC(programCounter).Name(), + "gitaly.(*TransactionManager).Run", + ) + + if isDeferredStopInRun && hooks.beforeDeferredStop != nil { + hooks.beforeDeferredStop(hookContext) + } + + hookContext.stopManager() + } + + transactionManager.db = databaseHook{ + database: newDatabaseAdapter(database), + hooks: hooks, + hookContext: hookContext, + } + + transactionManager.repository = repositoryHook{ + repository: repository, + hookContext: hookContext, + hooks: hooks, + } +} + +type repositoryHook struct { + repository + hookContext + hooks +} + +func (hook repositoryHook) ResolveRevision(ctx context.Context, revision git.Revision) (git.ObjectID, error) { + if hook.beforeResolveRevision != nil { + hook.hooks.beforeResolveRevision(hook.hookContext) + } + + return hook.repository.ResolveRevision(ctx, revision) +} + +type databaseHook struct { + database + hookContext + hooks +} + +func (hook databaseHook) View(handler func(databaseTransaction) error) error { + return hook.database.View(func(transaction databaseTransaction) error { + return handler(databaseTransactionHook{ + databaseTransaction: transaction, + hookContext: hook.hookContext, + hooks: hook.hooks, + }) + }) +} + +func (hook databaseHook) NewWriteBatch() writeBatch { + return writeBatchHook{writeBatch: hook.database.NewWriteBatch()} +} + +type databaseTransactionHook struct { + databaseTransaction + hookContext + hooks +} + +var regexLogEntry = regexp.MustCompile("repository/.+/log/entry/") + +func (hook databaseTransactionHook) Get(key []byte) (*badger.Item, error) { + if regexLogEntry.Match(key) { + if hook.hooks.beforeReadLogEntry != nil { + hook.hooks.beforeReadLogEntry(hook.hookContext) + } + } + + return hook.databaseTransaction.Get(key) +} + +func (hook databaseTransactionHook) NewIterator(options badger.IteratorOptions) *badger.Iterator { + return hook.databaseTransaction.NewIterator(options) +} + +type writeBatchHook struct { + writeBatch +} + +func (hook writeBatchHook) Set(key []byte, value []byte) error { + return hook.writeBatch.Set(key, value) +} + +func (hook writeBatchHook) Flush() error { return hook.writeBatch.Flush() } + +func (hook writeBatchHook) Cancel() { hook.writeBatch.Cancel() } diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go new file mode 100644 index 000000000..808fa172d --- /dev/null +++ b/internal/gitaly/transaction_manager_test.go @@ -0,0 +1,1586 @@ +package gitaly + +import ( + "context" + "encoding/hex" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" +) + +func TestTransactionManager(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + // A clean repository is setup for each test. We build a repository ahead of the tests here once to + // get deterministic commit IDs, relative path and object hash we can use to build the declarative + // test cases. + relativePath := gittest.NewRepositoryName(t) + setupRepository := func(t *testing.T) (*localrepo.Repo, git.ObjectID, git.ObjectID, git.ObjectID) { + t.Helper() + + cfg := testcfg.Build(t) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: relativePath, + }) + + rootCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents()) + secondCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(rootCommitOID)) + thirdCommitOID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(secondCommitOID)) + + cmdFactory, clean, err := git.NewExecCommandFactory(cfg) + require.NoError(t, err) + t.Cleanup(clean) + + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + localRepo := localrepo.New( + config.NewLocator(cfg), + cmdFactory, + catfileCache, + repo, + ) + + return localRepo, rootCommitOID, secondCommitOID, thirdCommitOID + } + + // Collect commit OIDs and the object has so we can define the test cases with them. + repo, rootCommitOID, secondCommitOID, thirdCommitOID := setupRepository(t) + objectHash, err := repo.ObjectHash(ctx) + require.NoError(t, err) + + hasher := objectHash.Hash() + _, err = hasher.Write([]byte("content does not matter")) + require.NoError(t, err) + nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil))) + require.NoError(t, err) + + type testHooks struct { + // BeforeApplyLogEntry is called before a log entry is applied to the repository. + BeforeApplyLogEntry hookFunc + // BeforeAppendLogEntry is called before a log entry is appended to the log. + BeforeAppendLogEntry hookFunc + // WaitForTransactionsWhenStopping waits for a in-flight to finish before returning + // from Run. + WaitForTransactionsWhenStopping bool + } + + // Step defines a single execution step in a test. Each test case can define multiple steps to setup exercise + // more complex behavior and to assert the state after each step. + type steps []struct { + // StopManager stops the manager in the beginning of the step. + StopManager bool + // StartManager can be used to start the manager again after stopping it. + StartManager bool + // Context is the context to use for the Propose call of the step. + Context context.Context + // Transaction is the transaction that is proposed in this step. + Transaction Transaction + // Hooks contains the hook functions that are configured on the TransactionManager. These allow + // for better synchronization. + Hooks testHooks + // ExpectedRunError is the expected error to be returned from Run from this step. + ExpectedRunError bool + // ExpectedProposeError is the error that is expected to be returned when proposing the transaction in this step. + ExpectedProposeError error + // ExpectedReferences is the expected state of references at the end of this step. + ExpectedReferences []git.Reference + // ExpectedDatabase is the expected state of the database at the end of this step. + ExpectedDatabase DatabaseState + } + + type testCase struct { + desc string + steps steps + } + + testCases := []testCase{ + { + desc: "invalid reference aborts the entire transaction", + steps: steps{ + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: "refs/heads/../main"}, + }, + }, + }, + { + desc: "continues processing after aborting due to an invalid reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/../main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: "refs/heads/../main"}, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create a file-directory reference conflict different transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: updateref.ErrFileDirectoryConflict{ + ExistingReferenceName: "refs/heads/parent", + ConflictingReferenceName: "refs/heads/parent/child", + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create a file-directory reference conflict in same transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: updateref.ErrInTransactionConflict{ + FirstReferenceName: "refs/heads/parent", + SecondReferenceName: "refs/heads/parent/child", + }, + }, + }, + }, + { + desc: "file-directory conflict aborts the transaction with verification failures skipped", + steps: steps{ + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: updateref.ErrInTransactionConflict{ + FirstReferenceName: "refs/heads/parent", + SecondReferenceName: "refs/heads/parent/child", + }, + }, + }, + }, + { + desc: "delete file-directory conflict in different transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent/child", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent/child"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: updateref.ErrFileDirectoryConflict{ + ExistingReferenceName: "refs/heads/parent/child", + ConflictingReferenceName: "refs/heads/parent", + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/parent/child", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/parent/child"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete file-directory conflict in same transaction", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/parent/child": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/parent": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: updateref.ErrInTransactionConflict{ + FirstReferenceName: "refs/heads/parent", + SecondReferenceName: "refs/heads/parent/child", + }, + }, + }, + }, + { + desc: "create a branch to a non-commit object", + steps: steps{ + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + // The error should abort the entire transaction. + "refs/heads/branch-1": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/branch-2": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, + }, + }, + ExpectedProposeError: updateref.NonCommitObjectError{ + ReferenceName: "refs/heads/branch-2", + ObjectID: objectHash.EmptyTreeOID.String(), + }, + }, + }, + }, + { + desc: "create a tag to a non-commit object", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/tags/v1.0.0": {OldOID: objectHash.ZeroOID, NewOID: objectHash.EmptyTreeOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/tags/v1.0.0", Target: objectHash.EmptyTreeOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/tags/v1.0.0"), + NewOid: []byte(objectHash.EmptyTreeOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create a reference to non-existent object", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: nonExistentOID}, + }, + }, + ExpectedProposeError: updateref.NonExistentObjectError{ + ReferenceName: "refs/heads/main", + ObjectID: nonExistentOID.String(), + }, + }, + }, + }, + { + desc: "create reference ignoring verification failure", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: secondCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create reference that already exists", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: objectHash.ZeroOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "create reference no-op", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: objectHash.ZeroOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update reference ignoring verification failures", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: thirdCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(thirdCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update reference with incorrect old tip", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: thirdCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "update non-existent reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + }, + }, + { + desc: "update reference no-op", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(objectHash.ZeroOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete reference ignoring verification failures", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + SkipVerificationFailures: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(objectHash.ZeroOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete reference with incorrect old tip", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + "refs/heads/non-conflicting": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: objectHash.ZeroOID}, + "refs/heads/non-conflicting": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{ + {Name: "refs/heads/main", Target: rootCommitOID.String()}, + {Name: "refs/heads/non-conflicting", Target: rootCommitOID.String()}, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + { + ReferenceName: []byte("refs/heads/non-conflicting"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "delete non-existent reference", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: rootCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + }, + }, + { + desc: "delete reference no-op", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: objectHash.ZeroOID}, + }, + }, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(objectHash.ZeroOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "continues processing after reference verification failure", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: rootCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "continues processing after a restart", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + StopManager: true, + StartManager: true, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "continues processing after restarting after a reference verification failure", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: rootCommitOID, + ActualOID: objectHash.ZeroOID, + }, + }, + { + StopManager: true, + StartManager: true, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "recovers from the write-ahead log on start up", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + hookCtx.stopManager() + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedRunError: true, + ExpectedDatabase: DatabaseState{ + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: rootCommitOID, NewOID: secondCommitOID}, + }, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: secondCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + string(keyLogEntry(getRepositoryID(repo), 2)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(secondCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "reference verification fails after recovering logged writes", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + hookCtx.stopManager() + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedRunError: true, + ExpectedDatabase: DatabaseState{ + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: secondCommitOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: ReferenceVerificationError{ + ReferenceName: "refs/heads/main", + ExpectedOID: secondCommitOID, + ActualOID: rootCommitOID, + }, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + { + desc: "propose returns if context is canceled before admission", + steps: steps{ + { + Context: func() context.Context { + ctx, cancel := context.WithCancel(ctx) + cancel() + return ctx + }(), + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: context.Canceled, + }, + }, + }, + { + desc: "propose returns if transaction processing stops before admission", + steps: steps{ + { + StopManager: true, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + }, + }, + }, + func() testCase { + ctx, cancel := context.WithCancel(ctx) + return testCase{ + desc: "propose returns if context is canceled after admission", + steps: steps{ + { + Context: ctx, + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + // Cancel the context used in Propose + cancel() + }, + }, + ExpectedProposeError: context.Canceled, + ExpectedReferences: []git.Reference{{Name: "refs/heads/main", Target: rootCommitOID.String()}}, + ExpectedDatabase: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(1).toProto(), + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + } + }(), + { + desc: "propose returns if transaction processing stops before transaction acceptance", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.stopManager() }, + // This ensures we are testing the context cancellation errors being unwrapped properly + // to an ErrTransactionProcessingStopped instead of hitting the general case when + // runDone is closed. + WaitForTransactionsWhenStopping: true, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + }, + }, + }, + { + desc: "propose returns if transaction processing stops after transaction acceptance", + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookCtx hookContext) { + hookCtx.stopManager() + }, + }, + ExpectedProposeError: ErrTransactionProcessingStopped, + ExpectedRunError: true, + ExpectedDatabase: DatabaseState{ + string(keyLogEntry(getRepositoryID(repo), 1)): &gitalypb.LogEntry{ + ReferenceUpdates: []*gitalypb.LogEntry_ReferenceUpdate{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(rootCommitOID), + }, + }, + }, + }, + }, + }, + }, + } + + type invalidReferenceTestCase struct { + desc string + referenceName git.ReferenceName + invalidReference git.ReferenceName + } + + appendInvalidReferenceTestCase := func(tc invalidReferenceTestCase) { + invalidReference := tc.invalidReference + if invalidReference == "" { + invalidReference = tc.referenceName + } + + testCases = append(testCases, testCase{ + desc: fmt.Sprintf("invalid reference %s", tc.desc), + steps: steps{ + { + Transaction: Transaction{ + ReferenceUpdates: ReferenceUpdates{ + tc.referenceName: {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, + }, + }, + ExpectedProposeError: InvalidReferenceFormatError{ReferenceName: invalidReference}, + }, + }, + }) + } + + // Generate test cases for the reference format rules according to https://git-scm.com/docs/git-check-ref-format. + // This is to ensure the references are correctly validated prior to logging so they are guaranteed to apply later. + for _, tc := range []invalidReferenceTestCase{ + // 1. They can include slash / for hierarchical (directory) grouping, but no slash-separated + // component can begin with a dot . or end with the sequence .lock. + {"starting with a period", ".refs/heads/main", ""}, + {"subcomponent starting with a period", "refs/heads/.main", ""}, + {"ending in .lock", "refs/heads/main.lock", ""}, + {"subcomponent ending in .lock", "refs/heads/main.lock/main", ""}, + // 2. They must contain at least one /. This enforces the presence of a category like heads/, + // tags/ etc. but the actual names are not restricted. + {"without a /", "one-level", ""}, + {"with refs without a /", "refs", ""}, + // We restrict this further by requiring a 'refs/' prefix to ensure loose references only end up + // in the 'refs/' folder. + {"without refs/ prefix ", "nonrefs/main", ""}, + // 3. They cannot have two consecutive dots .. anywhere. + {"containing two consecutive dots", "refs/heads/../main", ""}, + // 4. They cannot have ASCII control characters ... (\177 DEL), space, tilde ~, caret ^, or colon : anywhere. + // + // Tests for control characters < \040 generated further down. + {"containing DEL", "refs/heads/ma\177in", "refs/heads/ma?in"}, + {"containing space", "refs/heads/ma in", ""}, + {"containing ~", "refs/heads/ma~in", ""}, + {"containing ^", "refs/heads/ma^in", ""}, + {"containing :", "refs/heads/ma:in", ""}, + // 5. They cannot have question-mark ?, asterisk *, or open bracket [ anywhere. + {"containing ?", "refs/heads/ma?in", ""}, + {"containing *", "refs/heads/ma*in", ""}, + {"containing [", "refs/heads/ma[in", ""}, + // 6. They cannot begin or end with a slash / or contain multiple consecutive slashes + {"begins with /", "/refs/heads/main", ""}, + {"ends with /", "refs/heads/main/", ""}, + {"contains consecutive /", "refs/heads//main", ""}, + // 7. They cannot end with a dot. + {"ending in a dot", "refs/heads/main.", ""}, + // 8. They cannot contain a sequence @{. + {"invalid reference contains @{", "refs/heads/m@{n", ""}, + // 9. They cannot be the single character @. + {"is a single character @", "@", ""}, + // 10. They cannot contain a \. + {`containing \`, `refs/heads\main`, `refs/heads\main`}, + } { + appendInvalidReferenceTestCase(tc) + } + + // Rule 4. They cannot have ASCII control characters i.e. bytes whose values are lower than \040, + for i := byte(0); i < '\040'; i++ { + invalidReference := fmt.Sprintf("refs/heads/ma%sin", []byte{i}) + + // For now, the reference format is checked by 'git update-ref'. Most of the control characters + // are substituted by a '?' by Git when reporting the error. + // + // '\t' is reported without substitution. '\n' and '\000' are checked by the TransactionManager + // separately so it can report a better error than what Git produces. + // + // This is temporarily more complicated than it needs to be. Later iteration will move reference + // format checking into Gitaly at which point we can return proper errors without substituting + // the problematic characters. + var expectedSubstitute string + switch i { + case '\000', '\n', '\t': + expectedSubstitute = string(i) + default: + expectedSubstitute = "?" + } + + appendInvalidReferenceTestCase(invalidReferenceTestCase{ + desc: fmt.Sprintf(`containing ASCII control character %d`, i), + referenceName: git.ReferenceName(invalidReference), + invalidReference: git.ReferenceName(fmt.Sprintf("refs/heads/ma%sin", expectedSubstitute)), + }) + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + // Setup the repository with the exact same state as what was used to build the test cases. + repository, _, _, _ := setupRepository(t) + + database, err := OpenDatabase(t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, database) + + var ( + // managerRunning tracks whether the manager is running or stopped. + managerRunning bool + // transactionManager is the current TransactionManager instance. + transactionManager *TransactionManager + // managerErr is used for synchronizing manager stopping and returning + // the error from Run. + managerErr chan error + // inflightTransactions tracks the number of on going propose calls. It is used to synchronize + // the database hooks with propose calls. + inflightTransactions sync.WaitGroup + ) + + // stopManager stops the manager. It waits until the manager's Run method has exited. + stopManager := func() { + t.Helper() + + transactionManager.Stop() + managerRunning, err = checkManagerError(t, managerErr, transactionManager) + require.NoError(t, err) + } + + // startManager starts fresh manager and applies hooks into it. + startManager := func(testHooks testHooks) { + t.Helper() + + require.False(t, managerRunning, "manager started while it was already running") + managerRunning = true + managerErr = make(chan error) + + transactionManager = NewTransactionManager(database, repository) + installHooks(t, transactionManager, database, repository, hooks{ + beforeResolveRevision: testHooks.BeforeAppendLogEntry, + beforeReadLogEntry: testHooks.BeforeApplyLogEntry, + beforeDeferredStop: func(hookContext) { + if testHooks.WaitForTransactionsWhenStopping { + inflightTransactions.Wait() + } + }, + }) + + go func() { managerErr <- transactionManager.Run() }() + } + + // Stop the manager if it is running at the end of the test. + defer stopManager() + for _, step := range tc.steps { + // Ensure every step starts with the manager running. + if !managerRunning { + startManager(step.Hooks) + } + + if step.StopManager { + require.True(t, managerRunning, "manager stopped while it was already stopped") + stopManager() + } + + if step.StartManager { + startManager(step.Hooks) + } + + func() { + inflightTransactions.Add(1) + defer inflightTransactions.Done() + + proposeCtx := ctx + if step.Context != nil { + proposeCtx = step.Context + } + + require.ErrorIs(t, transactionManager.Propose(proposeCtx, step.Transaction), step.ExpectedProposeError) + }() + + if managerRunning, err = checkManagerError(t, managerErr, transactionManager); step.ExpectedRunError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + RequireReferences(t, ctx, repository, step.ExpectedReferences) + RequireDatabase(t, ctx, database, step.ExpectedDatabase) + } + }) + } +} + +func checkManagerError(t *testing.T, managerErr chan error, mgr *TransactionManager) (bool, error) { + t.Helper() + + select { + case err, ok := <-managerErr: + if ok { + close(managerErr) + } + + // managerErr returns the possible error if manager has already stopped. + return false, err + case mgr.admissionQueue <- transactionFuture{ + transaction: Transaction{ReferenceUpdates: ReferenceUpdates{"sentinel": {}}}, + result: make(resultChannel, 1), + }: + // If the error channel doesn't receive, we don't know whether it is because the manager is still running + // or we are still waiting for it to return. We test whether the manager is running or not here by queueing a + // a proposal that will error. If the manager processes it, we know it is still running. + return true, nil + } +} diff --git a/internal/testhelper/leakage.go b/internal/testhelper/leakage.go index ffec6c7ae..5dc9cae20 100644 --- a/internal/testhelper/leakage.go +++ b/internal/testhelper/leakage.go @@ -16,6 +16,10 @@ import ( // mustHaveNoGoroutines panics if it finds any Goroutines running. func mustHaveNoGoroutines() { if err := goleak.Find( + // BadgerDB depends on Ristretto which uses glog for logging. glog initializes + // on import a log flushing goroutine that keeps running in the background. + // Ignore this goroutine as there is no way to stop it. + goleak.IgnoreTopFunction("github.com/golang/glog.(*loggingT).flushDaemon"), // opencensus has a "defaultWorker" which is started by the package's // `init()` function. There is no way to stop this worker, so it will leak // whenever we import the package. |