diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-05-10 13:13:40 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-05-10 13:13:40 +0300 |
commit | 46ee0248c36241d856710030958fa5e1e04b179e (patch) | |
tree | 68e546afe967d4e6939a53a8ba661b83f40ce34d | |
parent | 242c75f51dd840799467c41b0fe15a2214ae07c2 (diff) | |
parent | 04a5d55597c65ea5f4755b560018065c8de341f8 (diff) |
Merge branch 'smh-log-pack-files' into 'master'qmnguyen0711/always-log-queue-ms
Implement write-ahead logging for objects
Closes #4790
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5638
Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r-- | internal/gitaly/partition_manager.go | 21 | ||||
-rw-r--r-- | internal/gitaly/partition_manager_test.go | 19 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager.go | 385 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager_test.go | 600 | ||||
-rw-r--r-- | proto/go/gitalypb/log.pb.go | 56 | ||||
-rw-r--r-- | proto/log.proto | 3 |
6 files changed, 1019 insertions, 65 deletions
diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 73a7ca97e..a07661096 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -3,6 +3,8 @@ package gitaly import ( "context" "errors" + "fmt" + "os" "sync" "github.com/dgraph-io/badger/v3" @@ -33,6 +35,9 @@ type PartitionManager struct { stopped bool // partitionsWG keeps track of running partitions. partitionsWG sync.WaitGroup + // stagingDirectory is the directory where all of the TransactionManager staging directories + // should be created. + stagingDirectory string } // partition contains the transaction manager and tracks the number of in-flight transactions for the partition. @@ -50,12 +55,13 @@ type partition struct { } // NewPartitionManager returns a new PartitionManager. -func NewPartitionManager(db *badger.DB, localRepoFactory func(repo.GitRepo) *localrepo.Repo, logger logrus.FieldLogger) *PartitionManager { +func NewPartitionManager(db *badger.DB, localRepoFactory func(repo.GitRepo) *localrepo.Repo, logger logrus.FieldLogger, stagingDir string) *PartitionManager { return &PartitionManager{ db: db, partitions: make(map[string]*partition), localRepoFactory: localRepoFactory, logger: logger, + stagingDirectory: stagingDir, } } @@ -79,7 +85,13 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran shutdown: make(chan struct{}), } - mgr := NewTransactionManager(pm.db, localRepo, pm.transactionFinalizerFactory(ptn)) + stagingDir, err := os.MkdirTemp(pm.stagingDirectory, "") + if err != nil { + pm.mu.Unlock() + return nil, fmt.Errorf("create staging directory: %w", err) + } + + mgr := NewTransactionManager(pm.db, localRepo, stagingDir, pm.transactionFinalizerFactory(ptn)) ptn.transactionManager = mgr pm.partitions[partitionKey] = ptn @@ -99,6 +111,11 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran pm.mu.Unlock() close(ptn.shutdown) + + if err := os.RemoveAll(stagingDir); err != nil { + pm.logger.WithError(err).Error("failed removing partition's staging directory") + } + pm.partitionsWG.Done() }() } diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index 288bd3ca2..a3dc27b9e 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -2,6 +2,9 @@ package gitaly import ( "context" + "io/fs" + "os" + "path/filepath" "testing" "github.com/sirupsen/logrus" @@ -12,11 +15,14 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo" repo "gitlab.com/gitlab-org/gitaly/v15/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" ) func TestPartitionManager(t *testing.T) { + umask := perm.GetUmask() + t.Parallel() ctx := testhelper.Context(t) @@ -431,8 +437,17 @@ func TestPartitionManager(t *testing.T) { require.NoError(t, err) defer testhelper.MustClose(t, database) - partitionManager := NewPartitionManager(database, localRepoFactory, logrus.StandardLogger()) - defer partitionManager.Stop() + stagingDir := filepath.Join(t.TempDir(), "staging") + require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir)) + + partitionManager := NewPartitionManager(database, localRepoFactory, logrus.StandardLogger(), stagingDir) + defer func() { + partitionManager.Stop() + // Assert all staging directories have been removed. + testhelper.RequireDirectoryState(t, stagingDir, "", testhelper.DirectoryState{ + "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + }) + }() // openTransactionData holds references to all transactions and its associated partition // created during the testcase. diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 018f5ea8d..e220d5edd 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "fmt" + "io" "io/fs" "os" "path/filepath" @@ -21,8 +22,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v15/internal/safe" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) @@ -119,11 +122,30 @@ type Snapshot struct { type Transaction struct { // commit commits the Transaction through the TransactionManager. commit func(context.Context, *Transaction) error - // rollback rolls back the Transaction through the TransactionManager. - rollback func() error + // finalize decrements the active transaction count on the partition in the PartitionManager. This is + // really only a concern for the PartitionManager and will be moved out from here later. + finalize func() // result is where the outcome of the transaction is sent ot by TransactionManager once it // has been determined. result chan error + // admitted denotes whether the transaction was admitted for processing in the TransactionManager. + // Transaction queues in admissionQueue to be committed, and is considered admitted once it has + // been dequeued by TransactionManager.Run(). Once the transaction is admitted, its ownership moves + // from the client goroutine to the TransactionManager.Run() goroutine, and the client goroutine must + // not do any modifications to the state of the transcation anymore to avoid races. + admitted bool + // initStagingDirectory is called to lazily initialize the staging directory when it is + // needed. + initStagingDirectory func() error + // stagingDirectory is the directory where the transaction stages its files prior + // to them being logged. It is cleaned up when the transaction finishes. + stagingDirectory string + // quarantineDirectory is the directory within the stagingDirectory where the new objects of the + // transaction are quarantined. + quarantineDirectory string + // includesPack is set if a pack file has been computed for the transaction and should be + // logged. + includesPack bool // Snapshot contains the details of the Transaction's read snapshot. snapshot Snapshot @@ -149,11 +171,16 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) } mgr.mutex.RLock() - snapshot := Snapshot{ - ReadIndex: mgr.appendedLogIndex, - HookIndex: mgr.hookIndex, + txn := &Transaction{ + commit: mgr.commit, + finalize: mgr.transactionFinalizer, + snapshot: Snapshot{ + ReadIndex: mgr.appendedLogIndex, + HookIndex: mgr.hookIndex, + }, } - readReady := mgr.applyNotifications[snapshot.ReadIndex] + + readReady := mgr.applyNotifications[txn.snapshot.ReadIndex] mgr.mutex.RUnlock() if readReady == nil { // The snapshot log entry is already applied if there is no notification channel for it. @@ -162,29 +189,66 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) close(readReady) } + txn.initStagingDirectory = func() error { + stagingDirectory, err := os.MkdirTemp(mgr.stagingDirectory, "") + if err != nil { + return fmt.Errorf("mkdir temp: %w", err) + } + + txn.stagingDirectory = stagingDirectory + return nil + } + select { case <-ctx.Done(): return nil, ctx.Err() case <-mgr.ctx.Done(): return nil, ErrTransactionProcessingStopped case <-readReady: - return &Transaction{ - commit: mgr.commit, - rollback: mgr.rollback, - snapshot: snapshot, - }, nil + return txn, nil } } // Commit performs the changes. If no error is returned, the transaction was successful and the changes // have been performed. If an error was returned, the transaction may or may not be persisted. -func (txn *Transaction) Commit(ctx context.Context) error { +func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { + defer func() { + txn.finalize() + + if err := txn.cleanUnadmitted(); err != nil && returnedErr == nil { + returnedErr = err + } + }() + return txn.commit(ctx, txn) } // Rollback releases resources associated with the transaction without performing any changes. func (txn *Transaction) Rollback() error { - return txn.rollback() + defer txn.finalize() + return txn.cleanUnadmitted() +} + +// cleanUnadmitted cleans up after the transaction if it wasn't yet admitted. If the transaction was admitted, +// the Transaction is being processed by TransactionManager. The clean up responsibility moves there as well +// to avoid races. +func (txn *Transaction) cleanUnadmitted() error { + if txn.admitted { + return nil + } + + return txn.clean() +} + +// clean cleans up the resources associated with the transaction. +func (txn *Transaction) clean() error { + if txn.stagingDirectory != "" { + if err := os.RemoveAll(txn.stagingDirectory); err != nil { + return fmt.Errorf("remove staging directory: %w", err) + } + } + + return nil } // Snapshot returns the details of the Transaction's read snapshot. @@ -209,6 +273,24 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) { txn.referenceUpdates = updates } +// QuarantineDirectory returns an absolute path to the transaction's quarantine directory. The quarantine directory +// is a Git object directory where the new objects introduced in the transaction must be written. The quarantined +// objects needed by the updated reference tips will be included in the transaction. +func (txn *Transaction) QuarantineDirectory() (string, error) { + if err := txn.initStagingDirectory(); err != nil { + return "", fmt.Errorf("init staging directory: %w", err) + } + + quarantineDirectory := filepath.Join(txn.stagingDirectory, "quarantine") + if err := os.MkdirAll(filepath.Join(quarantineDirectory, "pack"), perm.PrivateDir); err != nil { + return "", fmt.Errorf("create quarantine directory: %w", err) + } + + txn.quarantineDirectory = quarantineDirectory + + return quarantineDirectory, nil +} + // SetDefaultBranch sets the default branch as part of the transaction. If SetDefaultBranch is called // multiple times, only the changes from the latest invocation take place. The reference is validated // to exist. @@ -223,6 +305,11 @@ func (txn *Transaction) SetCustomHooks(hooksTAR []byte) { txn.customHooksUpdate = &CustomHooksUpdate{CustomHooksTAR: hooksTAR} } +// packFilePath returns the path to this transaction's pack file. +func (txn *Transaction) packFilePath() string { + return filepath.Join(txn.stagingDirectory, "transaction.pack") +} + // TransactionManager is responsible for transaction management of a single repository. Each repository has // a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from // the admissionQueue. Each admitted write is processed in three steps: @@ -275,6 +362,11 @@ type TransactionManager struct { // being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly // releases awaiters when the transactions processing is stopped. runDone chan struct{} + // stagingDirectory is a path to a directory where this TransactionManager should stage the files of the transactions + // before it logs them. The TransactionManager cleans up the files during runtime but stale files may be + // left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when + // Gitaly starts. + stagingDirectory string // repository is the repository this TransactionManager is acting on. repository repository @@ -310,10 +402,14 @@ type repository interface { ResolveRevision(context.Context, git.Revision) (git.ObjectID, error) SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error Path() (string, error) + UnpackObjects(context.Context, io.Reader) error + Quarantine(string) (*localrepo.Repo, error) + WalkUnreachableObjects(context.Context, io.Reader, io.Writer) error + PackObjects(context.Context, io.Reader, io.Writer) error } // NewTransactionManager returns a new TransactionManager for the given repository. -func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, transactionFinalizer func()) *TransactionManager { +func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, stagingDir string, transactionFinalizer func()) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) return &TransactionManager{ ctx: ctx, @@ -325,6 +421,7 @@ func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, transactio admissionQueue: make(chan *Transaction), initialized: make(chan struct{}), applyNotifications: make(map[LogIndex]chan struct{}), + stagingDirectory: stagingDir, transactionFinalizer: transactionFinalizer, } } @@ -335,12 +432,16 @@ type resultChannel chan error // commit queues the transaction for processing and returns once the result has been determined. func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error { - defer mgr.transactionFinalizer() - transaction.result = make(resultChannel, 1) + if err := mgr.packObjects(ctx, transaction); err != nil { + return fmt.Errorf("pack objects: %w", err) + } + select { case mgr.admissionQueue <- transaction: + transaction.admitted = true + select { case err := <-transaction.result: return unwrapExpectedError(err) @@ -356,11 +457,83 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact } } -// rollback rolls back and ends the transaction without committing. -func (mgr *TransactionManager) rollback() error { - mgr.transactionFinalizer() +// packObjects packs the objects included in the transaction into a single pack file that is ready +// for logging. The pack file includes all unreachable objects that are about to be made reachable. +func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) error { + if transaction.quarantineDirectory == "" { + return nil + } - return nil + quarantinedRepo, err := mgr.repository.Quarantine(transaction.quarantineDirectory) + if err != nil { + return fmt.Errorf("quarantine: %w", err) + } + + objectHash, err := quarantinedRepo.ObjectHash(ctx) + if err != nil { + return fmt.Errorf("object hash: %w", err) + } + + heads := make([]string, 0, len(transaction.referenceUpdates)) + for _, update := range transaction.referenceUpdates { + if update.NewOID == objectHash.ZeroOID { + // Reference deletions can't introduce new objects so ignore them. + continue + } + + heads = append(heads, update.NewOID.String()) + } + + if len(heads) == 0 { + // No need to pack objects if there are no changes that can introduce new objects. + return nil + } + + transaction.includesPack = true + + objectsReader, objectsWriter := io.Pipe() + + group, ctx := errgroup.WithContext(ctx) + group.Go(func() (returnedErr error) { + defer func() { objectsWriter.CloseWithError(returnedErr) }() + + if err := quarantinedRepo.WalkUnreachableObjects(ctx, + strings.NewReader(strings.Join(heads, "\n")), + objectsWriter, + ); err != nil { + return fmt.Errorf("walk objects: %w", err) + } + + return nil + }) + + group.Go(func() (returnedErr error) { + defer func() { objectsReader.CloseWithError(returnedErr) }() + + destinationFile, err := os.OpenFile( + transaction.packFilePath(), + os.O_WRONLY|os.O_CREATE|os.O_EXCL, + perm.PrivateFile, + ) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + defer destinationFile.Close() + + if err := quarantinedRepo.PackObjects(ctx, objectsReader, destinationFile); err != nil { + return fmt.Errorf("pack objects: %w", err) + } + + // Sync the contents of the pack so they are flushed to disk prior to the transaction + // being admitted for processing. + if err := destinationFile.Sync(); err != nil { + return fmt.Errorf("sync pack: %w", err) + } + + return destinationFile.Close() + }) + + return group.Wait() } // unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller. @@ -418,10 +591,23 @@ func (mgr *TransactionManager) Run() (returnedErr error) { // processTransaction waits for a transaction and processes it by verifying and // logging it. -func (mgr *TransactionManager) processTransaction() error { +func (mgr *TransactionManager) processTransaction() (returnedErr error) { + var cleanUps []func() error + defer func() { + for _, cleanUp := range cleanUps { + if err := cleanUp(); err != nil && returnedErr == nil { + returnedErr = fmt.Errorf("clean up: %w", err) + } + } + }() + var transaction *Transaction select { case transaction = <-mgr.admissionQueue: + // The Transaction does not clean up itself anymore once it has been admitted for + // processing. This avoids the Transaction concurrently removing the staged state + // while the manager is still operating on it. We thus need to defer its clean up. + cleanUps = append(cleanUps, transaction.clean) case <-mgr.ctx.Done(): } @@ -431,7 +617,7 @@ func (mgr *TransactionManager) processTransaction() error { return err } - transaction.result <- func() error { + transaction.result <- func() (commitErr error) { logEntry, err := mgr.verifyReferences(mgr.ctx, transaction) if err != nil { return fmt.Errorf("verify references: %w", err) @@ -442,8 +628,31 @@ func (mgr *TransactionManager) processTransaction() error { CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR, } } + nextLogIndex := mgr.appendedLogIndex + 1 + + if transaction.includesPack { + logEntry.IncludesPack = true + + removePack, err := mgr.storePackFile(mgr.ctx, nextLogIndex, transaction) + cleanUps = append(cleanUps, func() error { + // The transaction's pack file might have been moved in to place at <repo>/wal/packs/<log_index>.pack. + // If anything fails before the transaction is committed, the pack file must be removed as otherwise + // it would occupy the pack file slot of the next log entry. If this can't be done, the TransactionManager + // will exit with an error. The pack file will be cleaned up on restart and no further processing is + // allowed until that happens. + if commitErr != nil { + return removePack() + } - return mgr.appendLogEntry(logEntry) + return nil + }) + + if err != nil { + return fmt.Errorf("store pack file: %w", err) + } + } + + return mgr.appendLogEntry(nextLogIndex, logEntry) }() return nil @@ -506,6 +715,10 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { mgr.applyNotifications[i] = make(chan struct{}) } + if err := mgr.removeStalePackFiles(mgr.ctx, mgr.appendedLogIndex); err != nil { + return fmt.Errorf("remove stale packs: %w", err) + } + return nil } @@ -565,6 +778,7 @@ func (mgr *TransactionManager) createDirectories() error { for _, relativePath := range []string{ "wal/hooks", + "wal/packs", } { directory := filepath.Join(repoPath, relativePath) if _, err := os.Stat(directory); err != nil { @@ -585,6 +799,83 @@ func (mgr *TransactionManager) createDirectories() error { return nil } +// removeStalePackFiles removes pack files from the log directory that have no associated log entry. +// Such packs can be left around if a transaction's pack file was moved in place successfully +// but the manager was interrupted before successfully persisting the log entry itself. +func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error { + // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale + // pack would be for the next log index. Remove the pack if it exists. + possibleStalePackPath, err := mgr.packFilePath(appendedIndex + 1) + if err != nil { + return fmt.Errorf("pack file path: %w", err) + } + + if err := os.Remove(possibleStalePackPath); err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("remove: %w", err) + } + + return nil + } + + // Sync the parent directory to flush the file deletion. + if err := safe.NewSyncer().Sync(filepath.Dir(possibleStalePackPath)); err != nil { + return fmt.Errorf("sync: %w", err) + } + + return nil +} + +// storePackFile moves the transaction's pack file from the object directory to its destination in the log. +// It returns a function, even on errors, that must be called to clean up the pack file committing the log entry +// fails. +func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) { + removePack := func() error { return nil } + + destinationPath, err := mgr.packFilePath(index) + if err != nil { + return removePack, fmt.Errorf("pack file path: %w", err) + } + + if err := os.Rename( + transaction.packFilePath(), + destinationPath, + ); err != nil { + return removePack, fmt.Errorf("move pack file: %w", err) + } + + removePack = func() error { + if err := os.Remove(destinationPath); err != nil { + return fmt.Errorf("remove pack file: %w", err) + } + + return nil + } + + // Sync the parent directory. The pack's contents are synced when the pack file is computed. + if err := safe.NewSyncer().Sync(filepath.Dir(destinationPath)); err != nil { + return removePack, fmt.Errorf("sync: %w", err) + } + + return removePack, nil +} + +// packFilePath returns the path where a given log entry's pack file would be stored. +func (mgr *TransactionManager) packFilePath(index LogIndex) (string, error) { + repoPath, err := mgr.repository.Path() + if err != nil { + return "", fmt.Errorf("repo path: %w", err) + } + + return packFilePathForLogIndex(repoPath, index), nil +} + +// packFilePathForLogIndex returns a log entry's pack file's absolute path in a given +// a repository path. +func packFilePathForLogIndex(repoPath string, index LogIndex) string { + return filepath.Join(repoPath, "wal", "packs", index.String()+".pack") +} + // verifyReferences verifies that the references in the transaction apply on top of the already accepted // reference changes. The old tips in the transaction are verified against the current actual tips. // It returns the write-ahead log entry for the transaction if it was successfully verified. @@ -644,7 +935,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction ) == -1 }) - if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil { + if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates, transaction.quarantineDirectory); err != nil { return nil, fmt.Errorf("verify references with git: %w", err) } @@ -665,8 +956,8 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // the updates will go through when they are being applied in the log. This also catches any invalid reference names // and file/directory conflicts with Git's loose reference storage which can occur with references like // 'refs/heads/parent' and 'refs/heads/parent/child'. -func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error { - updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates) +func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, quarantineDirectory string) error { + updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, quarantineDirectory) if err != nil { return fmt.Errorf("prepare reference transaction: %w", err) } @@ -716,8 +1007,17 @@ func (mgr *TransactionManager) updateDefaultBranch(ctx context.Context, defaultB // prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing // or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up // if an error is returned. -func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) { - updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions()) +func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, quarantineDirectory string) (*updateref.Updater, error) { + repository := mgr.repository + if quarantineDirectory != "" { + var err error + repository, err = mgr.repository.Quarantine(quarantineDirectory) + if err != nil { + return nil, fmt.Errorf("quarantine: %w", err) + } + } + + updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions()) if err != nil { return nil, fmt.Errorf("new: %w", err) } @@ -741,9 +1041,7 @@ func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, // appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not // logged nor applied later. -func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error { - nextLogIndex := mgr.appendedLogIndex + 1 - +func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *gitalypb.LogEntry) error { if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil { return fmt.Errorf("set log entry: %w", err) } @@ -766,7 +1064,13 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return fmt.Errorf("read log entry: %w", err) } - updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates) + if logEntry.IncludesPack { + if err := mgr.applyPackFile(ctx, logIndex); err != nil { + return fmt.Errorf("apply pack file: %w", err) + } + } + + updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, "") if err != nil { return fmt.Errorf("perpare reference transaction: %w", err) } @@ -807,6 +1111,23 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return nil } +// applyPackFile unpacks the objects from the pack file into the repository if the log entry +// has an associated pack file. +func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error { + packFilePath, err := mgr.packFilePath(logIndex) + if err != nil { + return fmt.Errorf("pack file path: %w", err) + } + + packFile, err := os.Open(packFilePath) + if err != nil { + return fmt.Errorf("open pack file: %w", err) + } + defer packFile.Close() + + return mgr.repository.UnpackObjects(ctx, packFile) +} + // applyCustomHooks applies the custom hooks to the repository from the log entry. The hooks are stored // at `<repo>/wal/hooks/<log_index>`. The hooks are fsynced prior to returning so it is safe to delete // the log entry afterwards. diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 616a013b8..96648cc0d 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -8,6 +8,10 @@ import ( "errors" "fmt" "io/fs" + "os" + "path/filepath" + "sort" + "strings" "sync" "testing" "time" @@ -63,6 +67,49 @@ func validCustomHooks(tb testing.TB) []byte { // call back to PartitionManager. func noopTransactionFinalizer() {} +// writePack writes a pack file and its index into the destination. +func writePack(tb testing.TB, cfg config.Cfg, packFile []byte, destinationPack string) { + tb.Helper() + + require.NoError(tb, os.WriteFile(destinationPack, packFile, fs.ModePerm)) + gittest.ExecOpts(tb, cfg, + gittest.ExecConfig{Stdin: bytes.NewReader(packFile)}, + "index-pack", "--object-format="+gittest.DefaultObjectHash.Format, destinationPack, + ) +} + +// packFileDirectoryEntry returns a DirectoryEntry that parses content as a pack file and asserts that the +// set of objects in the pack file matches the expected objects. +func packFileDirectoryEntry(cfg config.Cfg, mode fs.FileMode, expectedObjects []git.ObjectID) testhelper.DirectoryEntry { + sortObjects := func(objects []git.ObjectID) { + sort.Slice(objects, func(i, j int) bool { + return objects[i] < objects[j] + }) + } + + sortObjects(expectedObjects) + + return testhelper.DirectoryEntry{ + Mode: mode, + Content: expectedObjects, + ParseContent: func(tb testing.TB, content []byte) any { + tb.Helper() + + tempDir := tb.TempDir() + // Initialize a temporary repository where to write the pack. The cat file invocation for listing + // the objects needs to run within a repository, and it would otherwise use the developer's repository. + // If the object format doesn't match with the pack files in the test, things fail. + gittest.Exec(tb, cfg, "init", "--object-format="+gittest.DefaultObjectHash.Format, "--bare", tempDir) + writePack(tb, cfg, content, filepath.Join(tempDir, "objects", "pack", "content.pack")) + + actualObjects := gittest.ListObjects(tb, cfg, tempDir) + sortObjects(actualObjects) + + return actualObjects + }, + } +} + func TestTransactionManager(t *testing.T) { umask := perm.GetUmask() @@ -71,7 +118,8 @@ func TestTransactionManager(t *testing.T) { ctx := testhelper.Context(t) type testCommit struct { - OID git.ObjectID + OID git.ObjectID + Pack []byte } type testCommits struct { @@ -125,15 +173,35 @@ func TestTransactionManager(t *testing.T) { nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil))) require.NoError(t, err) + packCommit := func(oid git.ObjectID) []byte { + t.Helper() + + var pack bytes.Buffer + require.NoError(t, + localRepo.PackObjects(ctx, strings.NewReader(oid.String()), &pack), + ) + + return pack.Bytes() + } + return testSetup{ Config: cfg, ObjectHash: objectHash, Repository: localRepo, NonExistentOID: nonExistentOID, Commits: testCommits{ - First: testCommit{OID: firstCommitOID}, - Second: testCommit{OID: secondCommitOID}, - Third: testCommit{OID: thirdCommitOID}, + First: testCommit{ + OID: firstCommitOID, + Pack: packCommit(firstCommitOID), + }, + Second: testCommit{ + OID: secondCommitOID, + Pack: packCommit(secondCommitOID), + }, + Third: testCommit{ + OID: thirdCommitOID, + Pack: packCommit(thirdCommitOID), + }, }, } } @@ -170,6 +238,10 @@ func TestTransactionManager(t *testing.T) { // ExpectedError is the expected error to be raised from the manager's Run. Panics are converted // to errors and asserted to match this as well. ExpectedError error + // ModifyRepository allows for running modifying the repository prior the manager starting. This + // may be necessary to test some states that can be reached from hard crashes but not during the + // tests. + ModifyRepository func(tb testing.TB, repoPath string) } // StopManager stops a TransactionManager. @@ -209,6 +281,8 @@ func TestTransactionManager(t *testing.T) { SkipVerificationFailures bool // ReferenceUpdates are the reference updates to commit. ReferenceUpdates ReferenceUpdates + // QuarantinedPacks are the packs to include in the quarantine directory of the transaction. + QuarantinedPacks [][]byte // DefaultBranchUpdate is the default branch update to commit. DefaultBranchUpdate *DefaultBranchUpdate // CustomHooksUpdate is the custom hooks update to commit. @@ -221,6 +295,12 @@ func TestTransactionManager(t *testing.T) { TransactionID int } + // Prune prunes all unreferenced objects from the repository. + type Prune struct { + // ExpectedObjects are the object expected to exist in the repository after pruning. + ExpectedObjects []git.ObjectID + } + // StateAssertions models an assertion of the entire state managed by the TransactionManager. type StateAssertion struct { // DefaultBranch is the expected refname that HEAD points to. @@ -959,6 +1039,7 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1/pre-receive": { @@ -1002,6 +1083,7 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1/pre-receive": { @@ -1080,6 +1162,7 @@ func TestTransactionManager(t *testing.T) { "/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, "/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, "/wal/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, }, }, }, @@ -1761,6 +1844,7 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1/pre-receive": { @@ -1773,6 +1857,470 @@ func TestTransactionManager(t *testing.T) { }, }, }, + { + desc: "pack file includes referenced commit", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + ExpectedError: updateref.NonExistentObjectError{ + ReferenceName: "refs/heads/main", + ObjectID: setup.Commits.First.OID.String(), + }, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + }, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + }, + }, + { + desc: "pack file includes unreachable objects depended upon", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + QuarantinedPacks: [][]byte{ + setup.Commits.First.Pack, + setup.Commits.Second.Pack, + }, + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + // Point main to the first commit so the second one is unreachable. + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.Second.OID, NewOID: setup.Commits.First.OID}, + }, + }, + AssertManager{}, + StopManager{}, + StartManager{ + // Crash the manager before the third transaction is applied. This allows us to + // prune before it is applied to ensure the pack file contains all necessary commits. + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 2, + }, + }, + Commit{ + TransactionID: 3, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.Third.Pack}, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + // Prune so the unreachable commits have been removed prior to the third log entry being + // applied. + Prune{ + ExpectedObjects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + }, + StartManager{}, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.Third.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(3).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + }, + ), + "/wal/packs/3.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.Commits.Second.OID, + setup.Commits.Third.OID, + }, + ), + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + setup.Commits.Third.OID, + }, + }, + }, + { + desc: "pack file reapplying works", + steps: steps{ + Prune{}, + StartManager{ + Hooks: testHooks{ + BeforeStoreAppliedLogIndex: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{}, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + }, + }, + { + desc: "pack file missing referenced commit", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + ExpectedError: localrepo.BadObjectError{ObjectID: setup.Commits.Second.OID}, + }, + }, + expectedState: StateAssertion{ + Objects: []git.ObjectID{}, + }, + }, + { + desc: "pack file missing intermediate commit", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.Third.Pack}, + ExpectedError: localrepo.ObjectReadError{ObjectID: setup.Commits.Second.OID}, + }, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + }, + }, + { + desc: "pack file only", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "pack file with deletions", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.Second.Pack}, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + }, + }, + { + desc: "pack file applies with dependency concurrently deleted", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID}, + }, + }, + AssertManager{}, + Prune{}, + Commit{ + TransactionID: 3, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/dependant": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.Second.Pack}, + // The transaction fails to apply as we are not yet maintaining internal references + // to the old tips of concurrently deleted references. This causes the prune step to + // remove the object this the pack file depends on. + // + // For now, keep the test case to assert the behavior. We'll fix this in a later MR. + ExpectedError: localrepo.ObjectReadError{ + ObjectID: setup.Commits.First.OID, + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "pack files without log entries are cleaned up after a crash", + steps: steps{ + StartManager{ + // The manager cleans up pack files if a committing fails. Since we can't + // hard kill the manager and it will still run the deferred clean up functions, + // we have to test the behavior by manually creating a stale pack here. + // + // The Manager starts up and we expect the pack file to be gone at the end of the test. + ModifyRepository: func(tb testing.TB, repoPath string) { + packFilePath := packFilePathForLogIndex(repoPath, 1) + require.NoError(t, os.MkdirAll(filepath.Dir(packFilePath), fs.ModePerm)) + require.NoError(t, os.WriteFile( + packFilePath, + []byte("invalid pack"), + fs.ModePerm, + )) + }, + }, + }, + }, } type invalidReferenceTestCase struct { @@ -1906,11 +2454,13 @@ func TestTransactionManager(t *testing.T) { require.NoError(t, err) defer testhelper.MustClose(t, database) + stagingDir := t.TempDir() + var ( // managerRunning tracks whether the manager is running or stopped. managerRunning bool // transactionManager is the current TransactionManager instance. - transactionManager = NewTransactionManager(database, setup.Repository, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, setup.Repository, stagingDir, noopTransactionFinalizer) // managerErr is used for synchronizing manager stopping and returning // the error from Run. managerErr chan error @@ -1943,10 +2493,15 @@ func TestTransactionManager(t *testing.T) { switch step := step.(type) { case StartManager: require.False(t, managerRunning, "test error: manager started while it was already running") + + if step.ModifyRepository != nil { + step.ModifyRepository(t, repoPath) + } + managerRunning = true managerErr = make(chan error) - transactionManager = NewTransactionManager(database, setup.Repository, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, setup.Repository, stagingDir, noopTransactionFinalizer) installHooks(t, transactionManager, database, setup.Repository, hooks{ beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry, beforeResolveRevision: step.Hooks.BeforeAppendLogEntry, @@ -2014,6 +2569,28 @@ func TestTransactionManager(t *testing.T) { transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR) } + if step.QuarantinedPacks != nil { + quarantineDirectory, err := transaction.QuarantineDirectory() + require.NoError(t, err) + + for _, dir := range []string{ + transaction.stagingDirectory, + quarantineDirectory, + } { + const expectedPerm = perm.PrivateDir + stat, err := os.Stat(dir) + require.NoError(t, err) + require.Equal(t, stat.Mode().Perm(), umask.Mask(expectedPerm), + "%q had %q permission but expected %q", dir, stat.Mode().Perm().String(), expectedPerm, + ) + } + + for i, pack := range step.QuarantinedPacks { + writePack(t, setup.Config, pack, filepath.Join(quarantineDirectory, "pack", fmt.Sprintf("%d.pack", i))) + } + + } + commitCtx := ctx if step.Context != nil { commitCtx = step.Context @@ -2023,6 +2600,9 @@ func TestTransactionManager(t *testing.T) { case Rollback: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction rollbacked before beginning it") require.NoError(t, openTransactions[step.TransactionID].Rollback()) + case Prune: + gittest.Exec(t, setup.Config, "-C", repoPath, "prune") + require.ElementsMatch(t, step.ExpectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) default: t.Fatalf("unhandled step type: %T", step) } @@ -2043,6 +2623,7 @@ func TestTransactionManager(t *testing.T) { // gets asserted. expectedDirectory = testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, } } @@ -2060,6 +2641,10 @@ func TestTransactionManager(t *testing.T) { } require.ElementsMatch(t, expectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) + + entries, err := os.ReadDir(stagingDir) + require.NoError(t, err) + require.Empty(t, entries, "staging directory was not cleaned up") }) } } @@ -2070,6 +2655,7 @@ func checkManagerError(t *testing.T, managerErrChannel chan error, mgr *Transact testTransaction := &Transaction{ referenceUpdates: ReferenceUpdates{"sentinel": {}}, result: make(chan error, 1), + finalize: func() {}, } var ( @@ -2212,7 +2798,7 @@ func BenchmarkTransactionManager(b *testing.B) { commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents()) commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1)) - manager := NewTransactionManager(database, localRepo, noopTransactionFinalizer) + manager := NewTransactionManager(database, localRepo, b.TempDir(), noopTransactionFinalizer) managers = append(managers, manager) managerWG.Add(1) diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go index d9a673cdc..d5d70c72d 100644 --- a/proto/go/gitalypb/log.pb.go +++ b/proto/go/gitalypb/log.pb.go @@ -38,6 +38,9 @@ type LogEntry struct { DefaultBranchUpdate *LogEntry_DefaultBranchUpdate `protobuf:"bytes,2,opt,name=default_branch_update,json=defaultBranchUpdate,proto3" json:"default_branch_update,omitempty"` // CustomHooksUpdate contains the custom hooks to set in the repository. CustomHooksUpdate *LogEntry_CustomHooksUpdate `protobuf:"bytes,3,opt,name=custom_hooks_update,json=customHooksUpdate,proto3" json:"custom_hooks_update,omitempty"` + // includes_pack denotes whether this log entry has a pack file associated + // with it. The pack files are stored separately on the filesystem. + IncludesPack bool `protobuf:"varint,4,opt,name=includes_pack,json=includesPack,proto3" json:"includes_pack,omitempty"` } func (x *LogEntry) Reset() { @@ -93,6 +96,13 @@ func (x *LogEntry) GetCustomHooksUpdate() *LogEntry_CustomHooksUpdate { return nil } +func (x *LogEntry) GetIncludesPack() bool { + if x != nil { + return x.IncludesPack + } + return false +} + // LogIndex serializes a log index. It's used for storing a repository's // applied log index in the database. // @@ -312,7 +322,7 @@ var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x22, 0xd7, 0x03, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x61, 0x6c, 0x79, 0x22, 0xfc, 0x03, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x4d, 0x0a, 0x11, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, @@ -328,27 +338,29 @@ var file_log_proto_rawDesc = []byte{ 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x11, 0x63, 0x75, 0x73, 0x74, - 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x1a, 0x51, 0x0a, - 0x0f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, - 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, - 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, - 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, - 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x1a, 0x3d, - 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, - 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, - 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63, - 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, 0x72, 0x22, 0x27, 0x0a, - 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67, - 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, 0x6f, - 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, - 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x35, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x50, 0x61, + 0x63, 0x6b, 0x1a, 0x51, 0x0a, 0x0f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, + 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, + 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, + 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, + 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, + 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, + 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, + 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, + 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, + 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, + 0x61, 0x72, 0x22, 0x27, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, + 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, + 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x35, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/log.proto b/proto/log.proto index 50c8ccdc5..52ccbca61 100644 --- a/proto/log.proto +++ b/proto/log.proto @@ -44,6 +44,9 @@ message LogEntry { DefaultBranchUpdate default_branch_update = 2; // CustomHooksUpdate contains the custom hooks to set in the repository. CustomHooksUpdate custom_hooks_update = 3; + // includes_pack denotes whether this log entry has a pack file associated + // with it. The pack files are stored separately on the filesystem. + bool includes_pack = 4; } // LogIndex serializes a log index. It's used for storing a repository's |