diff options
Diffstat (limited to 'internal/gitaly/transaction_manager.go')
-rw-r--r-- | internal/gitaly/transaction_manager.go | 539 |
1 files changed, 539 insertions, 0 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go new file mode 100644 index 000000000..c9c89bf81 --- /dev/null +++ b/internal/gitaly/transaction_manager.go @@ -0,0 +1,539 @@ +package gitaly + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "sort" + "strings" + + "github.com/dgraph-io/badger/v3" + "gitlab.com/gitlab-org/gitaly/v15/internal/git" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/protobuf/proto" +) + +// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. +var ErrTransactionProcessingStopped = errors.New("transaction processing stopped") + +// InvalidReferenceFormatError is returned when a reference name was invalid. +type InvalidReferenceFormatError struct { + // ReferenceName is the reference with invalid format. + ReferenceName git.ReferenceName +} + +// Error returns the formatted error string. +func (err InvalidReferenceFormatError) Error() string { + return fmt.Sprintf("invalid reference format: %q", err.ReferenceName) +} + +// ReferenceVerificationError is returned when a reference's old OID did not match the expected. +type ReferenceVerificationError struct { + // ReferenceName is the name of the reference that failed verification. + ReferenceName git.ReferenceName + // ExpectedOID is the OID the reference was expected to point to. + ExpectedOID git.ObjectID + // ActualOID is the OID the reference actually pointed to. + ActualOID git.ObjectID +} + +// Error returns the formatted error string. +func (err ReferenceVerificationError) Error() string { + return fmt.Sprintf("expected %q to point to %q but it pointed to %q", err.ReferenceName, err.ExpectedOID, err.ActualOID) +} + +// LogIndex points to a specific position in a repository's write-ahead log. +type LogIndex uint64 + +// toProto returns the protobuf representation of LogIndex for serialization purposes. +func (index LogIndex) toProto() *gitalypb.LogIndex { + return &gitalypb.LogIndex{LogIndex: uint64(index)} +} + +// ReferenceUpdate describes the state of a reference's old and new tip in an update. +type ReferenceUpdate struct { + // OldOID is the old OID the reference is expected to point to prior to updating it. + // If the reference does not point to the old value, the reference verification fails. + OldOID git.ObjectID + // NewOID is the new desired OID to point the reference to. + NewOID git.ObjectID +} + +// ReferenceUpdates contains references to update. Reference name is used as the key and the value +// is the expected old tip and the desired new tip. +type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate + +// Transaction is a unit-of-work that contains reference changes to perform on the repository. +type Transaction struct { + // SkipVerificationFailures causes references updates that failed the verification step to be + // dropped from the transaction. By default, any verification failures abort the entire transaction. + // + // The default behavior maps to the `--atomic` flag of `git push`. When this is set, the behavior matches + // what happens git's behavior without the flag. + SkipVerificationFailures bool + // ReferenceUpdates contains the reference updates to be performed. + ReferenceUpdates +} + +// TransactionManager is responsible for transaction management of a single repository. Each repository has +// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from +// the admissionQueue. Each admitted write is processed in three steps: +// +// 1. The references being updated are verified by ensuring the expected old tips match what the references +// actually point to prior to update. The entire transaction is by default aborted if a single reference +// fails the verification step. The reference verification behavior can be controlled on a per-transaction +// level by setting: +// - The reference verification failures can be ignored instead of aborting the entire transaction. +// If done, the references that failed verification are dropped from the transaction but the updates +// that passed verification are still performed. +// - The reference verification may also be skipped if the write is force updating references. If +// done, the current state of the references is ignored and they are directly updated to point +// to the new tips. +// 2. The transaction is appended to the write-ahead log. Once the write has been logged, it is effectively +// committed and will be applied to the repository even after restarting. +// 3. The transaction is applied from the write-ahead log to the repository by actually performing the reference +// changes. +// +// The goroutine that issued the transaction is waiting for the result while these steps are being performed. As +// there is no transaction control for readers yet, the issuer is only notified of a successful write after the +// write has been applied to the repository. +// +// TransactionManager recovers transactions after interruptions by applying the write-ahead logged transactions to +// the repository on start up. +// +// TransactionManager maintains the write-ahead log in a key-value store. It maintains the following key spaces: +// - `repository/<repository_id:string>/log/index/applied` +// - This key stores the index of the log entry that has been applied to the repository. This allows for +// determining how far a repository is in processing the log and which log entries need to be applied +// after starting up. Repository starts from log index 0 if there are no log entries recorded to have +// been applied. +// +// - `repository/<repository_id:string>/log/entry/<log_index:uint64>` +// - These keys hold the actual write-ahead log entries. A repository's first log entry starts at index 1 +// and the log index keeps monotonically increasing from there on without gaps. The write-ahead log +// entries are processed in ascending order. +// +// The values in the database are marshaled protocol buffer messages. Numbers in the keys are encoded as big +// endian to preserve the sort order of the numbers also in lexicographical order. +type TransactionManager struct { + // ctx is the context used for all operations. + ctx context.Context + // stop cancels ctx and stops the transaction processing. + stop context.CancelFunc + + // stopCalled is closed when Stop is called. It unblock transactions that are waiting to be admitted. + stopCalled <-chan struct{} + // runDone is closed when Run returns. It unblocks transactions that are waiting for a result after + // being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly + // releases awaiters when the transactions processing is stopped. + runDone chan struct{} + + // repository is the repository this TransactionManager is acting on. + repository repository + // db is the handle to the key-value store used for storing the write-ahead log related state. + db database + // admissionQueue is where the incoming writes are waiting to be admitted to the transaction + // manager. + admissionQueue chan transactionFuture + + // appendedLogIndex holds the index of the last log entry appended to the log. + appendedLogIndex LogIndex + // appliedLogIndex holds the index of the last log entry applied to the repository + appliedLogIndex LogIndex +} + +// repository is the localrepo interface used by TransactionManager. +type repository interface { + git.RepositoryExecutor + ResolveRevision(context.Context, git.Revision) (git.ObjectID, error) +} + +// NewTransactionManager returns a new TransactionManager for the given repository. +func NewTransactionManager(db *badger.DB, repository *localrepo.Repo) *TransactionManager { + ctx, cancel := context.WithCancel(context.Background()) + return &TransactionManager{ + ctx: ctx, + stopCalled: ctx.Done(), + runDone: make(chan struct{}), + stop: cancel, + repository: repository, + db: newDatabaseAdapter(db), + admissionQueue: make(chan transactionFuture), + } +} + +// resultChannel represents a future that will yield the result of a transaction once its +// outcome has been decided. +type resultChannel chan error + +// transactionFuture holds a transaction and the resultChannel the transaction queuer is waiting +// for the result on. +type transactionFuture struct { + transaction Transaction + result resultChannel +} + +// Propose queues a transaction for the TransactionManager to process. Propose returns once the transaction +// has been successfully applied to the repository. +// +// Transaction is not committed if any of the following errors is returned: +// - InvalidReferenceFormatError is returned when attempting to update a reference with an invalid name. +// - ReferenceVerificationError is returned when the reference does not point to the expected old tip. +// +// Transaction may or may not be committed if any of the following errors is returned: +// - ErrTransactionProcessing stopped is returned when the TransactionManager is closing and stops processing +// transactions. +// - Unexpected error occurs. +func (mgr *TransactionManager) Propose(ctx context.Context, transaction Transaction) error { + queuedTransaction := transactionFuture{transaction: transaction, result: make(resultChannel, 1)} + + select { + case mgr.admissionQueue <- queuedTransaction: + select { + case err := <-queuedTransaction.result: + return unwrapExpectedError(err) + case <-ctx.Done(): + return ctx.Err() + case <-mgr.runDone: + return ErrTransactionProcessingStopped + } + case <-ctx.Done(): + return ctx.Err() + case <-mgr.stopCalled: + return ErrTransactionProcessingStopped + } +} + +// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller. +func unwrapExpectedError(err error) error { + // The manager controls its own execution context and it is canceled only when Stop is called. + // Any context.Canceled errors returned are thus from shutting down so we report that here. + if errors.Is(err, context.Canceled) { + return ErrTransactionProcessingStopped + } + + return err +} + +// Run starts the transaction processing. On start up Run loads the indexes of the last appended and applied +// log entries from the database. It will then apply any transactions that have been logged but not applied +// to the repository. Once the recovery is completed, Run starts processing new transactions by verifying the +// references, logging the transaction and finally applying it to the repository. The transactions are acknowledged +// once they've been applied to the repository. +// +// Run keeps running until Stop is called or it encounters a fatal error. All active Propose calls return +// ErrTransactionProcessingStopped when Run returns. +func (mgr *TransactionManager) Run() error { + // Defer the Stop in order to release all on-going Propose calls in case of error. + defer close(mgr.runDone) + defer mgr.Stop() + + if err := mgr.initialize(); err != nil { + return fmt.Errorf("initialize: %w", err) + } + + // awaitingTransactions contains transactions waiting for their log entry to be applied to + // the repository. It's keyed by the log index the transaction is waiting to be applied and the + // value is the resultChannel that is waiting the result. + awaitingTransactions := make(map[LogIndex]resultChannel) + + for { + if mgr.appliedLogIndex < mgr.appendedLogIndex { + logIndex := mgr.appliedLogIndex + 1 + + if err := mgr.applyLogEntry(mgr.ctx, logIndex); err != nil { + return fmt.Errorf("apply log entry: %w", err) + } + + // There is no awaiter for a transaction if the transaction manager is recovering + // transactions from the log after starting up. + if resultChan, ok := awaitingTransactions[logIndex]; ok { + resultChan <- nil + delete(awaitingTransactions, logIndex) + } + + continue + } + + var transaction transactionFuture + select { + case transaction = <-mgr.admissionQueue: + case <-mgr.ctx.Done(): + } + + // Return if the manager was stopped. The select is indeterministic so this guarantees + // the manager stops the processing even if there are transactions in the queue. + if mgr.ctx.Err() != nil { + return nil + } + + if err := func() error { + logEntry, err := mgr.verifyReferences(mgr.ctx, transaction.transaction) + if err != nil { + return fmt.Errorf("verify references: %w", err) + } + + return mgr.appendLogEntry(logEntry) + }(); err != nil { + transaction.result <- err + continue + } + + awaitingTransactions[mgr.appendedLogIndex] = transaction.result + } +} + +// Stop stops the transaction processing causing Run to return. +func (mgr *TransactionManager) Stop() { mgr.stop() } + +// initialize initializes the TransactionManager by loading the applied log index and determining the +// appended log index from the database. +func (mgr *TransactionManager) initialize() error { + var appliedLogIndex gitalypb.LogIndex + if err := mgr.readKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("read applied log index: %w", err) + } + + mgr.appliedLogIndex = LogIndex(appliedLogIndex.LogIndex) + + // The index of the last appended log entry is determined from the indexes of the latest entry in the log and + // the latest applied log entry. If there is a log entry, it is the latest appended log entry. If there are no + // log entries, the latest log entry must have been applied to the repository and pruned away, meaning the index + // of the last appended log entry is the same as the index if the last applied log entry. + // + // As the log indexes in the keys are encoded in big endian, the latest log entry can be found by taking + // the first key when iterating the log entry key space in reverse. + return mgr.db.View(func(txn databaseTransaction) error { + logPrefix := keyPrefixLogEntries(getRepositoryID(mgr.repository)) + + iterator := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: logPrefix}) + defer iterator.Close() + + mgr.appendedLogIndex = mgr.appliedLogIndex + + // The iterator seeks to a key that is greater than or equal than seeked key. Since we are doing a reverse + // seek, we need to add 0xff to the prefix so the first iterated key is the latest log entry. + if iterator.Seek(append(logPrefix, 0xff)); iterator.Valid() { + mgr.appendedLogIndex = LogIndex(binary.BigEndian.Uint64(bytes.TrimPrefix(iterator.Item().Key(), logPrefix))) + } + + return nil + }) +} + +// verifyReferences verifies that the references in the transaction apply on top of the already accepted +// reference changes. The old tips in the transaction are verified against the current actual tips. +// It returns the write-ahead log entry for the transaction if it was successfully verified. +func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction Transaction) (*gitalypb.LogEntry, error) { + logEntry := &gitalypb.LogEntry{} + for referenceName, tips := range transaction.ReferenceUpdates { + // 'git update-ref' doesn't ensure the loose references end up in the + // refs directory so we enforce that here. + if !strings.HasPrefix(referenceName.String(), "refs/") { + return nil, InvalidReferenceFormatError{ReferenceName: referenceName} + } + + // We'll later implement reference format verification in Gitaly. update-ref reports errors with these characters + // in a difficult to parse manner. For now, let's check these two illegal characters separately so we can return a + // proper error. + for _, illegalCharacter := range []byte{0, '\n'} { + if bytes.Contains([]byte(referenceName), []byte{illegalCharacter}) { + return nil, InvalidReferenceFormatError{ReferenceName: referenceName} + } + } + + actualOldTip, err := mgr.repository.ResolveRevision(ctx, referenceName.Revision()) + if errors.Is(err, git.ErrReferenceNotFound) { + objectHash, err := mgr.repository.ObjectHash(ctx) + if err != nil { + return nil, fmt.Errorf("object hash: %w", err) + } + + actualOldTip = objectHash.ZeroOID + } else if err != nil { + return nil, fmt.Errorf("resolve revision: %w", err) + } + + if tips.OldOID != actualOldTip { + if transaction.SkipVerificationFailures { + continue + } + + return nil, ReferenceVerificationError{ + ReferenceName: referenceName, + ExpectedOID: tips.OldOID, + ActualOID: actualOldTip, + } + } + + logEntry.ReferenceUpdates = append(logEntry.ReferenceUpdates, &gitalypb.LogEntry_ReferenceUpdate{ + ReferenceName: []byte(referenceName), + NewOid: []byte(tips.NewOID), + }) + } + + // Sort the reference updates so the reference changes are always logged in a deterministic order. + sort.Slice(logEntry.ReferenceUpdates, func(i, j int) bool { + return bytes.Compare( + logEntry.ReferenceUpdates[i].ReferenceName, + logEntry.ReferenceUpdates[j].ReferenceName, + ) == -1 + }) + + if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil { + return nil, fmt.Errorf("verify references with git: %w", err) + } + + return logEntry, nil +} + +// vefifyReferencesWithGit verifies the reference updates with git by preparing reference transaction. This ensures +// the updates will go through when they are being applied in the log. This also catches any invalid reference names +// and file/directory conflicts with Git's loose reference storage which can occur with references like +// 'refs/heads/parent' and 'refs/heads/parent/child'. +func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error { + updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates) + if err != nil { + return fmt.Errorf("prepare reference transaction: %w", err) + } + + return updater.Close() +} + +// prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing +// or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up +// if an error is returned. +func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) { + updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions()) + if err != nil { + return nil, fmt.Errorf("new: %w", err) + } + + if err := updater.Start(); err != nil { + return nil, fmt.Errorf("start: %w", err) + } + + for _, referenceUpdate := range referenceUpdates { + if err := updater.Update(git.ReferenceName(referenceUpdate.ReferenceName), git.ObjectID(referenceUpdate.NewOid), ""); err != nil { + return nil, fmt.Errorf("update %q: %w", referenceUpdate.ReferenceName, err) + } + } + + if err := updater.Prepare(); err != nil { + var errInvalidReferenceFormat updateref.ErrInvalidReferenceFormat + if errors.As(err, &errInvalidReferenceFormat) { + return nil, InvalidReferenceFormatError{ReferenceName: git.ReferenceName(errInvalidReferenceFormat.ReferenceName)} + } + + return nil, fmt.Errorf("prepare: %w", err) + } + + return updater, nil +} + +// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not +// logged nor applied later. +func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error { + nextLogIndex := mgr.appendedLogIndex + 1 + + if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil { + return fmt.Errorf("set log entry: %w", err) + } + + mgr.appendedLogIndex = nextLogIndex + + return nil +} + +// applyLogEntry reads a log entry at the given index and applies it to the repository. +func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error { + var logEntry gitalypb.LogEntry + if err := mgr.readKey(keyLogEntry(getRepositoryID(mgr.repository), logIndex), &logEntry); err != nil { + return fmt.Errorf("read log entry: %w", err) + } + + updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates) + if err != nil { + return fmt.Errorf("perpare reference transaction: %w", err) + } + + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + if err := mgr.storeAppliedLogIndex(logIndex); err != nil { + return fmt.Errorf("set applied log index: %w", err) + } + + mgr.appliedLogIndex = logIndex + + return nil +} + +// storeLogEntry stores the log entry in the repository's write-ahead log at the given index. +func (mgr *TransactionManager) storeLogEntry(index LogIndex, entry *gitalypb.LogEntry) error { + return mgr.setKey(keyLogEntry(getRepositoryID(mgr.repository), index), entry) +} + +// storeAppliedLogIndex stores the repository's applied log index in the database. +func (mgr *TransactionManager) storeAppliedLogIndex(index LogIndex) error { + return mgr.setKey(keyAppliedLogIndex(getRepositoryID(mgr.repository)), index.toProto()) +} + +// setKey marshals and stores a given protocol buffer message into the database under the given key. +func (mgr *TransactionManager) setKey(key []byte, value proto.Message) error { + marshaledValue, err := proto.Marshal(value) + if err != nil { + return fmt.Errorf("marshal value: %w", err) + } + + writeBatch := mgr.db.NewWriteBatch() + defer writeBatch.Cancel() + + if err := writeBatch.Set(key, marshaledValue); err != nil { + return fmt.Errorf("set: %w", err) + } + + return writeBatch.Flush() +} + +// readKey reads a key from the database and unmarshals its value in to the destination protocol +// buffer message. +func (mgr *TransactionManager) readKey(key []byte, destination proto.Message) error { + return mgr.db.View(func(txn databaseTransaction) error { + item, err := txn.Get(key) + if err != nil { + return fmt.Errorf("get: %w", err) + } + + return item.Value(func(value []byte) error { return proto.Unmarshal(value, destination) }) + }) +} + +// getRepositoryID returns a repository's ID. The ID should never change as it is used in the database +// keys. Gitaly does not have a permanent ID to use yet so the repository's storage name and relative +// path are used as a composite key. +func getRepositoryID(repository repository) string { + return repository.GetStorageName() + ":" + repository.GetRelativePath() +} + +// keyAppliedLogIndex returns the database key storing a repository's last applied log entry's index. +func keyAppliedLogIndex(repositoryID string) []byte { + return []byte(fmt.Sprintf("repository/%s/log/index/applied", repositoryID)) +} + +// keyLogEntry returns the database key storing a repository's log entry at a given index. +func keyLogEntry(repositoryID string, index LogIndex) []byte { + marshaledIndex := make([]byte, binary.Size(index)) + binary.BigEndian.PutUint64(marshaledIndex, uint64(index)) + return []byte(fmt.Sprintf("%s%s", keyPrefixLogEntries(repositoryID), marshaledIndex)) +} + +// keyPrefixLogEntries returns the key prefix holding repository's write-ahead log entries. +func keyPrefixLogEntries(repositoryID string) []byte { + return []byte(fmt.Sprintf("repository/%s/log/entry/", repositoryID)) +} |