diff options
-rw-r--r-- | internal/gitaly/transaction_manager.go | 283 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager_test.go | 558 | ||||
-rw-r--r-- | internal/testhelper/directory.go | 4 |
3 files changed, 826 insertions, 19 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 3306fed7f..33b493b33 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "fmt" + "io" "io/fs" "os" "path/filepath" @@ -20,8 +21,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v15/internal/safe" "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) @@ -121,12 +124,15 @@ type Transaction struct { // result is where the outcome of the transaction is sent ot by TransactionManager once it // has been determined. result chan error + // admitted denotes whether the transaction was admitted for processing or not. + admitted bool // Snapshot contains the details of the Transaction's read snapshot. snapshot Snapshot skipVerificationFailures bool referenceUpdates ReferenceUpdates + objectDirectory string defaultBranchUpdate *DefaultBranchUpdate customHooksUpdate *CustomHooksUpdate } @@ -174,12 +180,34 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) // Commit performs the changes. If no error is returned, the transaction was successful and the changes // have been performed. If an error was returned, the transaction may or may not be persisted. -func (txn *Transaction) Commit(ctx context.Context) error { +func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { + defer func() { + if err := txn.clean(); err != nil && returnedErr == nil { + returnedErr = err + } + }() + return txn.commit(ctx, txn) } // Rollback releases resources associated with the transaction without performing any changes. func (txn *Transaction) Rollback() error { + return txn.clean() +} + +func (txn *Transaction) clean() error { + if txn.admitted { + // If the transaction was admitted, the Transaction is being processed by TransactionManager. + // The clean up responsibility moves there as well to avoid races. + return nil + } + + if txn.objectDirectory != "" { + if err := os.RemoveAll(txn.objectDirectory); err != nil { + return fmt.Errorf("remove object directory: %w", err) + } + } + return nil } @@ -205,6 +233,15 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) { txn.referenceUpdates = updates } +// IncludeObjects includes the objects in the given directory in the transaction. The directory should conform +// to the .git/objects layout and contain the new objects to write as part of the transaction. Typically this +// points to the quarantine directory used for an RPC. Only objects reachable from the new reference tips are +// ultimately written into the repository. The caller should not use the object directory anymore after passing +// it. The object directory is cleaned up in the eventual Rollback or Commit call. +func (txn *Transaction) IncludeObjects(objectDirectory string) { + txn.objectDirectory = objectDirectory +} + // SetDefaultBranch sets the default branch as part of the transaction. If SetDefaultBranch is called // multiple times, only the changes from the latest invocation take place. The reference is validated // to exist. @@ -303,6 +340,8 @@ type repository interface { ResolveRevision(context.Context, git.Revision) (git.ObjectID, error) SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error Path() (string, error) + UnpackObjects(context.Context, io.Reader) error + Quarantine(string) (*localrepo.Repo, error) } // NewTransactionManager returns a new TransactionManager for the given repository. @@ -329,8 +368,14 @@ type resultChannel chan error func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error { transaction.result = make(resultChannel, 1) + if err := mgr.packObjects(ctx, transaction); err != nil { + return fmt.Errorf("pack objects: %w", err) + } + select { case mgr.admissionQueue <- transaction: + transaction.admitted = true + select { case err := <-transaction.result: return unwrapExpectedError(err) @@ -346,6 +391,75 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact } } +// packObjects packs the objects included in the transaction into a single pack file that is ready +// for logging. The pack file includes all unreachable objects that are about to be made reachable. +func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) error { + if transaction.objectDirectory == "" { + return nil + } + + quarantinedRepo, err := mgr.repository.Quarantine(transaction.objectDirectory) + if err != nil { + return fmt.Errorf("quarantine: %w", err) + } + + objectsReader, objectsWriter := io.Pipe() + + group, ctx := errgroup.WithContext(ctx) + group.Go(func() (returnedErr error) { + defer func() { objectsWriter.CloseWithError(returnedErr) }() + + objectHash, err := quarantinedRepo.ObjectHash(ctx) + if err != nil { + return fmt.Errorf("object hash: %w", err) + } + + heads := make([]string, 0, len(transaction.referenceUpdates)) + for _, update := range transaction.referenceUpdates { + if update.NewOID == objectHash.ZeroOID { + // Reference deletions can't introduce new objects so ignore them. + continue + } + + heads = append(heads, update.NewOID.String()) + } + + if err := quarantinedRepo.WalkUnreachableObjects(ctx, + strings.NewReader(strings.Join(heads, "\n")), + objectsWriter, + ); err != nil { + return fmt.Errorf("walk objects: %w", err) + } + + return nil + }) + + group.Go(func() (returnedErr error) { + defer func() { objectsReader.CloseWithError(returnedErr) }() + + destinationFile, err := os.OpenFile( + filepath.Join(transaction.objectDirectory, "transaction.pack"), + os.O_WRONLY|os.O_CREATE|os.O_EXCL, + perm.PrivateFile, + ) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + defer destinationFile.Close() + + // TODO: This currently would include unreachable objects in the pool for forks which leads to + // excessive writes. Is that a problem? + // TODO: Ignore internal references + if err := quarantinedRepo.PackObjects(ctx, objectsReader, destinationFile); err != nil { + return fmt.Errorf("pack objects: %w", err) + } + + return destinationFile.Close() + }) + + return group.Wait() +} + // unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller. func unwrapExpectedError(err error) error { // The manager controls its own execution context and it is canceled only when Stop is called. @@ -401,10 +515,32 @@ func (mgr *TransactionManager) Run() (returnedErr error) { // processTransaction waits for a transaction and processes it by verifying and // logging it. -func (mgr *TransactionManager) processTransaction() error { +func (mgr *TransactionManager) processTransaction() (returnedErr error) { + var cleanUps []func() error + defer func() { + for _, cleanUp := range cleanUps { + if err := cleanUp(); err != nil && returnedErr == nil { + returnedErr = fmt.Errorf("clean up: %w", err) + } + } + }() + var transaction *Transaction select { case transaction = <-mgr.admissionQueue: + if transaction.objectDirectory != "" { + // The Transaction does not clean up the object directory anymore once the Transaction + // has been admitted. This avoids the Transaction concurrently removing the directory + // while the manager is still operating on it. We thus need to defer the clean up of + // the directory. + cleanUps = append(cleanUps, func() error { + if err := os.RemoveAll(transaction.objectDirectory); err != nil { + return fmt.Errorf("remove object directory: %w", err) + } + + return nil + }) + } case <-mgr.ctx.Done(): } @@ -414,7 +550,7 @@ func (mgr *TransactionManager) processTransaction() error { return err } - transaction.result <- func() error { + transaction.result <- func() (commitErr error) { logEntry, err := mgr.verifyReferences(mgr.ctx, transaction) if err != nil { return fmt.Errorf("verify references: %w", err) @@ -425,8 +561,29 @@ func (mgr *TransactionManager) processTransaction() error { CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR, } } + nextLogIndex := mgr.appendedLogIndex + 1 + + if transaction.objectDirectory != "" { + removePack, err := mgr.storePackFile(mgr.ctx, nextLogIndex, transaction) + cleanUps = append(cleanUps, func() error { + // The transaction's pack file might have been moved in to place at <repo>/wal/packs/<log_index>.pack. + // If anything fails before the transaction is committed, the pack file must be removed as otherwise + // it would occupy the pack file slot of the next log entry. If this can't be done, the TransactionManager + // will exit with an error. The pack file will be cleaned up on restart and no further processing is + // allowed until that happens. + if commitErr != nil { + return removePack() + } + + return nil + }) - return mgr.appendLogEntry(logEntry) + if err != nil { + return fmt.Errorf("store pack file: %w", err) + } + } + + return mgr.appendLogEntry(nextLogIndex, logEntry) }() return nil @@ -526,6 +683,10 @@ func (mgr *TransactionManager) initialize() error { mgr.applyNotifications[i] = make(chan struct{}) } + if err := mgr.removeStalePackFiles(mgr.ctx, mgr.appendedLogIndex); err != nil { + return fmt.Errorf("remove stale packs: %w", err) + } + return nil } @@ -540,6 +701,7 @@ func (mgr *TransactionManager) createDirectories() error { for _, relativePath := range []string{ "wal/hooks", + "wal/packs", } { directory := filepath.Join(repoPath, relativePath) if _, err := os.Stat(directory); err != nil { @@ -560,6 +722,69 @@ func (mgr *TransactionManager) createDirectories() error { return nil } +// removeStalePackFiles removes pack files from the log directory that have no associated log entry. +// Such packs can be left around if a transaction's pack file was moved in place succesfully +// but the manager was interrupted before successfully persisting the log entry itself. +func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error { + repoPath, err := mgr.repository.Path() + if err != nil { + return fmt.Errorf("repo path: %w", err) + } + + // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale + // pack would be for the next log index. Remove the pack if it exists. + possibleStalePackIndex := appendedIndex + 1 + packsDirectory := filepath.Join(repoPath, "wal", "packs") + possibleStalePack := possibleStalePackIndex.String() + ".pack" + + if err := os.Remove(filepath.Join(packsDirectory, possibleStalePack)); err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("remove: %w", err) + } + } else { + // Sync the parent directory to flush the file deletion. + if err := safe.NewSyncer().SyncHierarchy(packsDirectory, possibleStalePack); err != nil { + return fmt.Errorf("sync: %w", err) + } + } + + return nil +} + +func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) { + removePack := func() error { return nil } + + repoPath, err := mgr.repository.Path() + if err != nil { + return removePack, fmt.Errorf("repository path: %w", err) + } + + packsDirectory := filepath.Join(repoPath, "wal", "packs") + packFile := index.String() + ".pack" + destinationPath := filepath.Join(packsDirectory, packFile) + if err := os.Rename( + filepath.Join(transaction.objectDirectory, "transaction.pack"), + destinationPath, + ); err != nil { + return removePack, fmt.Errorf("move pack file: %w", err) + } + + removePack = func() error { + if err := os.Remove(destinationPath); err != nil { + return fmt.Errorf("remove pack file: %w", err) + } + + return nil + } + + // Sync the parent directory and the pack itself. + if err := safe.NewSyncer().SyncHierarchy(packsDirectory, packFile); err != nil { + return removePack, fmt.Errorf("sync: %w", err) + } + + return removePack, nil +} + // verifyReferences verifies that the references in the transaction apply on top of the already accepted // reference changes. The old tips in the transaction are verified against the current actual tips. // It returns the write-ahead log entry for the transaction if it was successfully verified. @@ -619,7 +844,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction ) == -1 }) - if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil { + if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates, transaction.objectDirectory); err != nil { return nil, fmt.Errorf("verify references with git: %w", err) } @@ -640,8 +865,8 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // the updates will go through when they are being applied in the log. This also catches any invalid reference names // and file/directory conflicts with Git's loose reference storage which can occur with references like // 'refs/heads/parent' and 'refs/heads/parent/child'. -func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error { - updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates) +func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, objectDirectory string) error { + updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, objectDirectory) if err != nil { return fmt.Errorf("prepare reference transaction: %w", err) } @@ -691,8 +916,17 @@ func (mgr *TransactionManager) updateDefaultBranch(ctx context.Context, defaultB // prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing // or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up // if an error is returned. -func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) { - updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions()) +func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, objectDirectory string) (*updateref.Updater, error) { + repository := mgr.repository + if objectDirectory != "" { + var err error + repository, err = mgr.repository.Quarantine(objectDirectory) + if err != nil { + return nil, fmt.Errorf("quarantine: %w", err) + } + } + + updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions()) if err != nil { return nil, fmt.Errorf("new: %w", err) } @@ -716,9 +950,7 @@ func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, // appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not // logged nor applied later. -func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error { - nextLogIndex := mgr.appendedLogIndex + 1 - +func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *gitalypb.LogEntry) error { if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil { return fmt.Errorf("set log entry: %w", err) } @@ -740,8 +972,11 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn if err != nil { return fmt.Errorf("read log entry: %w", err) } + if err := mgr.applyPackFile(ctx, logIndex); err != nil { + return fmt.Errorf("apply pack file: %w", err) + } - updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates) + updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, "") if err != nil { return fmt.Errorf("perpare reference transaction: %w", err) } @@ -782,6 +1017,28 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return nil } +// applyPackFile unpacks the objects from the pack file into the repository if the log entry +// has an associated pack file. +func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error { + repoPath, err := mgr.repository.Path() + if err != nil { + return fmt.Errorf("repository path: %w", err) + } + + packFile, err := os.Open(filepath.Join(repoPath, "wal", "packs", logIndex.String()+".pack")) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + // This log entry has no associated pack file. + return nil + } + + return fmt.Errorf("open pack file: %w", err) + } + defer packFile.Close() + + return mgr.repository.UnpackObjects(ctx, packFile) +} + // applyCustomHooks applies the custom hooks to the repository from the log entry. The hooks are stored // at `<repo>/wal/hooks/<log_index>`. The hooks are fsynced prior to returning so it is safe to delete // the log entry afterwards. diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index ce5899a5b..4f2e893b0 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -8,6 +8,10 @@ import ( "errors" "fmt" "io/fs" + "os" + "path/filepath" + "sort" + "strings" "sync" "testing" "time" @@ -59,6 +63,78 @@ func validCustomHooks(tb testing.TB) []byte { return hooks.Bytes() } +// writePack writes a pack file and its index into the destination. +func writePack(tb testing.TB, cfg config.Cfg, packFile []byte, destinationPack string) { + tb.Helper() + + gittest.ExecOpts(tb, + cfg, + gittest.ExecConfig{Stdin: bytes.NewReader(packFile)}, + "index-pack", "--stdin", destinationPack, + ) +} + +// buildObjectDirectory builds an object directory that contains the given pack files +// and returns the path of the directory. +func buildObjectDirectory(tb testing.TB, cfg config.Cfg, packFiles ...[]byte) string { + tb.Helper() + + objectsDirectory := tb.TempDir() + packDirectory := filepath.Join(objectsDirectory, "pack") + + require.NoError(tb, os.MkdirAll(packDirectory, fs.ModePerm)) + + for i, packFile := range packFiles { + writePack(tb, cfg, packFile, filepath.Join(packDirectory, fmt.Sprintf("pack-%d.pack", i))) + } + + return objectsDirectory +} + +// packFileDirectoryEntry returns a DirectoryEntry that parses content as a pack file and assert the +// set of objects in the pack file matches the expected objects. +func packFileDirectoryEntry(cfg config.Cfg, mode fs.FileMode, expectedObjects []git.ObjectID) testhelper.DirectoryEntry { + sortObjects := func(objects []git.ObjectID) { + sort.Slice(objects, func(i, j int) bool { + return objects[i] < objects[j] + }) + } + + sortObjects(expectedObjects) + + return testhelper.DirectoryEntry{ + Mode: mode, + Content: expectedObjects, + ParseContent: func(tb testing.TB, content []byte) any { + tb.Helper() + + tempDir := tb.TempDir() + packPath := filepath.Join(tempDir, "asserted-pack.pack") + writePack(tb, cfg, content, packPath) + + actualObjects := []git.ObjectID{} + for _, line := range strings.Split( + string(gittest.Exec(tb, testcfg.Build(tb), "verify-pack", "-v", packPath)), "\n", + ) { + components := strings.Split(line, " ") + if len(components) < 2 { + continue + } + + if strings.HasPrefix(components[1], "commit") || + strings.HasPrefix(components[1], "blob") || + strings.HasPrefix(components[1], "tree") { + actualObjects = append(actualObjects, git.ObjectID(components[0])) + } + } + + sortObjects(actualObjects) + + return actualObjects + }, + } +} + func TestTransactionManager(t *testing.T) { umask := perm.GetUmask() @@ -67,7 +143,8 @@ func TestTransactionManager(t *testing.T) { ctx := testhelper.Context(t) type testCommit struct { - OID git.ObjectID + OID git.ObjectID + Pack []byte } type testCommits struct { @@ -121,15 +198,35 @@ func TestTransactionManager(t *testing.T) { nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil))) require.NoError(t, err) + packCommit := func(oid git.ObjectID) []byte { + t.Helper() + + var pack bytes.Buffer + require.NoError(t, + localRepo.PackObjects(ctx, strings.NewReader(oid.String()), &pack), + ) + + return pack.Bytes() + } + return testSetup{ Config: cfg, ObjectHash: objectHash, Repository: localRepo, NonExistentOID: nonExistentOID, Commits: testCommits{ - First: testCommit{OID: firstCommitOID}, - Second: testCommit{OID: secondCommitOID}, - Third: testCommit{OID: thirdCommitOID}, + First: testCommit{ + OID: firstCommitOID, + Pack: packCommit(firstCommitOID), + }, + Second: testCommit{ + OID: secondCommitOID, + Pack: packCommit(secondCommitOID), + }, + Third: testCommit{ + OID: thirdCommitOID, + Pack: packCommit(thirdCommitOID), + }, }, } } @@ -205,6 +302,8 @@ func TestTransactionManager(t *testing.T) { SkipVerificationFailures bool // ReferenceUpdates are the reference updates to commit. ReferenceUpdates ReferenceUpdates + // IncludeObjects is the path to object directory with the objects to include. + IncludeObjects string // DefaultBranchUpdate is the default branch update to commit. DefaultBranchUpdate *DefaultBranchUpdate // CustomHooksUpdate is the custom hooks update to commit. @@ -217,6 +316,12 @@ func TestTransactionManager(t *testing.T) { TransactionID int } + // Prune prunes all unreferenced objects from the repository. + type Prune struct { + // ExpectedObjects are the object expected to exist in the repository after pruning. + ExpectedObjects []git.ObjectID + } + // StateAssertions models an assertion of the entire state managed by the TransactionManager. type StateAssertion struct { // DefaultBranch is the expected refname that HEAD points to. @@ -955,6 +1060,7 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1/pre-receive": { @@ -998,6 +1104,7 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1/pre-receive": { @@ -1690,6 +1797,7 @@ func TestTransactionManager(t *testing.T) { }, Directory: testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks/1/pre-receive": { @@ -1702,6 +1810,435 @@ func TestTransactionManager(t *testing.T) { }, }, }, + { + desc: "pack file includes referenced commit", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + ExpectedError: updateref.NonExistentObjectError{ + ReferenceName: "refs/heads/main", + ObjectID: setup.Commits.First.OID.String(), + }, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack), + }, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{setup.Commits.First.OID}, + }, + }, + { + desc: "pack file includes unreachable objects depended upon", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, + setup.Commits.First.Pack, + setup.Commits.Second.Pack, + ), + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + // Point main to the first commit so the second one is unreachable. + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.Second.OID, NewOID: setup.Commits.First.OID}, + }, + }, + AssertManager{}, + StopManager{}, + StartManager{ + // Crash the manager before the third transaction is applied. This allows us to + // prune before it is applied to ensure the pack file contains all necessary commits. + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 2, + }, + }, + Commit{ + TransactionID: 3, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Third.Pack), + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + // Prune so the unreachable commits have been removed prior to the third log entry being + // applied. + Prune{ + ExpectedObjects: []git.ObjectID{setup.Commits.First.OID}, + }, + StartManager{}, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.Third.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(3).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + }, + ), + "/wal/packs/3.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.Commits.Second.OID, + setup.Commits.Third.OID, + }, + ), + }, + Objects: []git.ObjectID{ + setup.Commits.First.OID, + setup.Commits.Second.OID, + setup.Commits.Third.OID, + }, + }, + }, + { + desc: "pack file reapplying works", + steps: steps{ + Prune{}, + StartManager{ + Hooks: testHooks{ + BeforeStoreAppliedLogIndex: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack), + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{}, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{setup.Commits.First.OID}, + }, + }, + { + desc: "pack file missing referenced commit", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack), + ExpectedError: localrepo.BadObjectError{ObjectID: setup.Commits.Second.OID}, + }, + }, + expectedState: StateAssertion{ + Objects: []git.ObjectID{}, + }, + }, + { + desc: "pack file missing intermediate commit", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack), + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Third.Pack), + ExpectedError: localrepo.ObjectReadError{ObjectID: setup.Commits.Second.OID}, + }, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/main", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{setup.Commits.First.OID}, + }, + }, + { + desc: "pack file only", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack), + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry(setup.Config, perm.PrivateFile, []git.ObjectID{}), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "pack file with deletions", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack), + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Second.Pack), + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + "/wal/packs/2.pack": packFileDirectoryEntry(setup.Config, perm.PrivateFile, []git.ObjectID{}), + }, + Objects: []git.ObjectID{setup.Commits.First.OID}, + }, + }, + { + desc: "pack file applies with dependency concurrently deleted", + steps: steps{ + Prune{}, + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack), + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID}, + }, + }, + AssertManager{}, + Prune{}, + Commit{ + TransactionID: 3, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/dependant": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Second.Pack), + // The transaction fails to apply as we are not yet maintaining internal references + // to the old tips of concurrently deleted references. This causes the prune step to + // remove the object this the pack file depends on. + // + // For now, keep the test case to assert the behavior. We'll fix this in a later MR. + ExpectedError: localrepo.ObjectReadError{ + ObjectID: setup.Commits.First.OID, + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + }, + ), + }, + Objects: []git.ObjectID{}, + }, + }, } type invalidReferenceTestCase struct { @@ -1861,6 +2398,7 @@ func TestTransactionManager(t *testing.T) { // openTransactions holds references to all of the transactions that have been // began in a test case. openTransactions := map[int]*Transaction{} + var objectDirectories []string // Stop the manager if it is running at the end of the test. defer func() { @@ -1943,6 +2481,11 @@ func TestTransactionManager(t *testing.T) { transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR) } + if step.IncludeObjects != "" { + objectDirectories = append(objectDirectories, step.IncludeObjects) + transaction.IncludeObjects(step.IncludeObjects) + } + commitCtx := ctx if step.Context != nil { commitCtx = step.Context @@ -1952,6 +2495,9 @@ func TestTransactionManager(t *testing.T) { case Rollback: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction rollbacked before beginning it") require.NoError(t, openTransactions[step.TransactionID].Rollback()) + case Prune: + gittest.Exec(t, setup.Config, "-C", repoPath, "prune") + gittest.RequireObjects(t, setup.Config, repoPath, step.ExpectedObjects) default: t.Fatalf("unhandled step type: %T", step) } @@ -1972,6 +2518,7 @@ func TestTransactionManager(t *testing.T) { // gets asserted. expectedDirectory = testhelper.DirectoryState{ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, } } @@ -1986,6 +2533,9 @@ func TestTransactionManager(t *testing.T) { } gittest.RequireObjects(t, setup.Config, repoPath, expectedObjects) + for _, dir := range objectDirectories { + require.NoDirExists(t, dir, "object directory was not cleaned up") + } }) } } diff --git a/internal/testhelper/directory.go b/internal/testhelper/directory.go index 05e650568..8b7befc5f 100644 --- a/internal/testhelper/directory.go +++ b/internal/testhelper/directory.go @@ -74,8 +74,8 @@ func RequireDirectoryState(tb testing.TB, rootDirectory, relativeDirectory strin return nil })) - // Create a copy of the expected and set the ParseContent to nil as functions always fail - // equality checks. We use a copy so we don't unexpectedly modify the original. + // Create a copy of the expected state and set the ParseContent to nil as functions always + // fail equality checks. We use a copy so we don't unexpectedly modify the original. expectedCopy := make(DirectoryState, len(expected)) for key, value := range expected { value.ParseContent = nil |