Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--internal/gitaly/transaction_manager.go283
-rw-r--r--internal/gitaly/transaction_manager_test.go558
-rw-r--r--internal/testhelper/directory.go4
3 files changed, 826 insertions, 19 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go
index 3306fed7f..33b493b33 100644
--- a/internal/gitaly/transaction_manager.go
+++ b/internal/gitaly/transaction_manager.go
@@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"fmt"
+ "io"
"io/fs"
"os"
"path/filepath"
@@ -20,8 +21,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v15/internal/safe"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
@@ -121,12 +124,15 @@ type Transaction struct {
// result is where the outcome of the transaction is sent ot by TransactionManager once it
// has been determined.
result chan error
+ // admitted denotes whether the transaction was admitted for processing or not.
+ admitted bool
// Snapshot contains the details of the Transaction's read snapshot.
snapshot Snapshot
skipVerificationFailures bool
referenceUpdates ReferenceUpdates
+ objectDirectory string
defaultBranchUpdate *DefaultBranchUpdate
customHooksUpdate *CustomHooksUpdate
}
@@ -174,12 +180,34 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error)
// Commit performs the changes. If no error is returned, the transaction was successful and the changes
// have been performed. If an error was returned, the transaction may or may not be persisted.
-func (txn *Transaction) Commit(ctx context.Context) error {
+func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) {
+ defer func() {
+ if err := txn.clean(); err != nil && returnedErr == nil {
+ returnedErr = err
+ }
+ }()
+
return txn.commit(ctx, txn)
}
// Rollback releases resources associated with the transaction without performing any changes.
func (txn *Transaction) Rollback() error {
+ return txn.clean()
+}
+
+func (txn *Transaction) clean() error {
+ if txn.admitted {
+ // If the transaction was admitted, the Transaction is being processed by TransactionManager.
+ // The clean up responsibility moves there as well to avoid races.
+ return nil
+ }
+
+ if txn.objectDirectory != "" {
+ if err := os.RemoveAll(txn.objectDirectory); err != nil {
+ return fmt.Errorf("remove object directory: %w", err)
+ }
+ }
+
return nil
}
@@ -205,6 +233,15 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) {
txn.referenceUpdates = updates
}
+// IncludeObjects includes the objects in the given directory in the transaction. The directory should conform
+// to the .git/objects layout and contain the new objects to write as part of the transaction. Typically this
+// points to the quarantine directory used for an RPC. Only objects reachable from the new reference tips are
+// ultimately written into the repository. The caller should not use the object directory anymore after passing
+// it. The object directory is cleaned up in the eventual Rollback or Commit call.
+func (txn *Transaction) IncludeObjects(objectDirectory string) {
+ txn.objectDirectory = objectDirectory
+}
+
// SetDefaultBranch sets the default branch as part of the transaction. If SetDefaultBranch is called
// multiple times, only the changes from the latest invocation take place. The reference is validated
// to exist.
@@ -303,6 +340,8 @@ type repository interface {
ResolveRevision(context.Context, git.Revision) (git.ObjectID, error)
SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error
Path() (string, error)
+ UnpackObjects(context.Context, io.Reader) error
+ Quarantine(string) (*localrepo.Repo, error)
}
// NewTransactionManager returns a new TransactionManager for the given repository.
@@ -329,8 +368,14 @@ type resultChannel chan error
func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error {
transaction.result = make(resultChannel, 1)
+ if err := mgr.packObjects(ctx, transaction); err != nil {
+ return fmt.Errorf("pack objects: %w", err)
+ }
+
select {
case mgr.admissionQueue <- transaction:
+ transaction.admitted = true
+
select {
case err := <-transaction.result:
return unwrapExpectedError(err)
@@ -346,6 +391,75 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact
}
}
+// packObjects packs the objects included in the transaction into a single pack file that is ready
+// for logging. The pack file includes all unreachable objects that are about to be made reachable.
+func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) error {
+ if transaction.objectDirectory == "" {
+ return nil
+ }
+
+ quarantinedRepo, err := mgr.repository.Quarantine(transaction.objectDirectory)
+ if err != nil {
+ return fmt.Errorf("quarantine: %w", err)
+ }
+
+ objectsReader, objectsWriter := io.Pipe()
+
+ group, ctx := errgroup.WithContext(ctx)
+ group.Go(func() (returnedErr error) {
+ defer func() { objectsWriter.CloseWithError(returnedErr) }()
+
+ objectHash, err := quarantinedRepo.ObjectHash(ctx)
+ if err != nil {
+ return fmt.Errorf("object hash: %w", err)
+ }
+
+ heads := make([]string, 0, len(transaction.referenceUpdates))
+ for _, update := range transaction.referenceUpdates {
+ if update.NewOID == objectHash.ZeroOID {
+ // Reference deletions can't introduce new objects so ignore them.
+ continue
+ }
+
+ heads = append(heads, update.NewOID.String())
+ }
+
+ if err := quarantinedRepo.WalkUnreachableObjects(ctx,
+ strings.NewReader(strings.Join(heads, "\n")),
+ objectsWriter,
+ ); err != nil {
+ return fmt.Errorf("walk objects: %w", err)
+ }
+
+ return nil
+ })
+
+ group.Go(func() (returnedErr error) {
+ defer func() { objectsReader.CloseWithError(returnedErr) }()
+
+ destinationFile, err := os.OpenFile(
+ filepath.Join(transaction.objectDirectory, "transaction.pack"),
+ os.O_WRONLY|os.O_CREATE|os.O_EXCL,
+ perm.PrivateFile,
+ )
+ if err != nil {
+ return fmt.Errorf("open file: %w", err)
+ }
+ defer destinationFile.Close()
+
+ // TODO: This currently would include unreachable objects in the pool for forks which leads to
+ // excessive writes. Is that a problem?
+ // TODO: Ignore internal references
+ if err := quarantinedRepo.PackObjects(ctx, objectsReader, destinationFile); err != nil {
+ return fmt.Errorf("pack objects: %w", err)
+ }
+
+ return destinationFile.Close()
+ })
+
+ return group.Wait()
+}
+
// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller.
func unwrapExpectedError(err error) error {
// The manager controls its own execution context and it is canceled only when Stop is called.
@@ -401,10 +515,32 @@ func (mgr *TransactionManager) Run() (returnedErr error) {
// processTransaction waits for a transaction and processes it by verifying and
// logging it.
-func (mgr *TransactionManager) processTransaction() error {
+func (mgr *TransactionManager) processTransaction() (returnedErr error) {
+ var cleanUps []func() error
+ defer func() {
+ for _, cleanUp := range cleanUps {
+ if err := cleanUp(); err != nil && returnedErr == nil {
+ returnedErr = fmt.Errorf("clean up: %w", err)
+ }
+ }
+ }()
+
var transaction *Transaction
select {
case transaction = <-mgr.admissionQueue:
+ if transaction.objectDirectory != "" {
+ // The Transaction does not clean up the object directory anymore once the Transaction
+ // has been admitted. This avoids the Transaction concurrently removing the directory
+ // while the manager is still operating on it. We thus need to defer the clean up of
+ // the directory.
+ cleanUps = append(cleanUps, func() error {
+ if err := os.RemoveAll(transaction.objectDirectory); err != nil {
+ return fmt.Errorf("remove object directory: %w", err)
+ }
+
+ return nil
+ })
+ }
case <-mgr.ctx.Done():
}
@@ -414,7 +550,7 @@ func (mgr *TransactionManager) processTransaction() error {
return err
}
- transaction.result <- func() error {
+ transaction.result <- func() (commitErr error) {
logEntry, err := mgr.verifyReferences(mgr.ctx, transaction)
if err != nil {
return fmt.Errorf("verify references: %w", err)
@@ -425,8 +561,29 @@ func (mgr *TransactionManager) processTransaction() error {
CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR,
}
}
+ nextLogIndex := mgr.appendedLogIndex + 1
+
+ if transaction.objectDirectory != "" {
+ removePack, err := mgr.storePackFile(mgr.ctx, nextLogIndex, transaction)
+ cleanUps = append(cleanUps, func() error {
+ // The transaction's pack file might have been moved in to place at <repo>/wal/packs/<log_index>.pack.
+ // If anything fails before the transaction is committed, the pack file must be removed as otherwise
+ // it would occupy the pack file slot of the next log entry. If this can't be done, the TransactionManager
+ // will exit with an error. The pack file will be cleaned up on restart and no further processing is
+ // allowed until that happens.
+ if commitErr != nil {
+ return removePack()
+ }
+
+ return nil
+ })
- return mgr.appendLogEntry(logEntry)
+ if err != nil {
+ return fmt.Errorf("store pack file: %w", err)
+ }
+ }
+
+ return mgr.appendLogEntry(nextLogIndex, logEntry)
}()
return nil
@@ -526,6 +683,10 @@ func (mgr *TransactionManager) initialize() error {
mgr.applyNotifications[i] = make(chan struct{})
}
+ if err := mgr.removeStalePackFiles(mgr.ctx, mgr.appendedLogIndex); err != nil {
+ return fmt.Errorf("remove stale packs: %w", err)
+ }
+
return nil
}
@@ -540,6 +701,7 @@ func (mgr *TransactionManager) createDirectories() error {
for _, relativePath := range []string{
"wal/hooks",
+ "wal/packs",
} {
directory := filepath.Join(repoPath, relativePath)
if _, err := os.Stat(directory); err != nil {
@@ -560,6 +722,69 @@ func (mgr *TransactionManager) createDirectories() error {
return nil
}
+// removeStalePackFiles removes pack files from the log directory that have no associated log entry.
+// Such packs can be left around if a transaction's pack file was moved in place succesfully
+// but the manager was interrupted before successfully persisting the log entry itself.
+func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error {
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return fmt.Errorf("repo path: %w", err)
+ }
+
+ // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale
+ // pack would be for the next log index. Remove the pack if it exists.
+ possibleStalePackIndex := appendedIndex + 1
+ packsDirectory := filepath.Join(repoPath, "wal", "packs")
+ possibleStalePack := possibleStalePackIndex.String() + ".pack"
+
+ if err := os.Remove(filepath.Join(packsDirectory, possibleStalePack)); err != nil {
+ if !errors.Is(err, fs.ErrNotExist) {
+ return fmt.Errorf("remove: %w", err)
+ }
+ } else {
+ // Sync the parent directory to flush the file deletion.
+ if err := safe.NewSyncer().SyncHierarchy(packsDirectory, possibleStalePack); err != nil {
+ return fmt.Errorf("sync: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) {
+ removePack := func() error { return nil }
+
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return removePack, fmt.Errorf("repository path: %w", err)
+ }
+
+ packsDirectory := filepath.Join(repoPath, "wal", "packs")
+ packFile := index.String() + ".pack"
+ destinationPath := filepath.Join(packsDirectory, packFile)
+ if err := os.Rename(
+ filepath.Join(transaction.objectDirectory, "transaction.pack"),
+ destinationPath,
+ ); err != nil {
+ return removePack, fmt.Errorf("move pack file: %w", err)
+ }
+
+ removePack = func() error {
+ if err := os.Remove(destinationPath); err != nil {
+ return fmt.Errorf("remove pack file: %w", err)
+ }
+
+ return nil
+ }
+
+ // Sync the parent directory and the pack itself.
+ if err := safe.NewSyncer().SyncHierarchy(packsDirectory, packFile); err != nil {
+ return removePack, fmt.Errorf("sync: %w", err)
+ }
+
+ return removePack, nil
+}
+
// verifyReferences verifies that the references in the transaction apply on top of the already accepted
// reference changes. The old tips in the transaction are verified against the current actual tips.
// It returns the write-ahead log entry for the transaction if it was successfully verified.
@@ -619,7 +844,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
) == -1
})
- if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil {
+ if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates, transaction.objectDirectory); err != nil {
return nil, fmt.Errorf("verify references with git: %w", err)
}
@@ -640,8 +865,8 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
// the updates will go through when they are being applied in the log. This also catches any invalid reference names
// and file/directory conflicts with Git's loose reference storage which can occur with references like
// 'refs/heads/parent' and 'refs/heads/parent/child'.
-func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error {
- updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates)
+func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, objectDirectory string) error {
+ updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, objectDirectory)
if err != nil {
return fmt.Errorf("prepare reference transaction: %w", err)
}
@@ -691,8 +916,17 @@ func (mgr *TransactionManager) updateDefaultBranch(ctx context.Context, defaultB
// prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing
// or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up
// if an error is returned.
-func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) {
- updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions())
+func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, objectDirectory string) (*updateref.Updater, error) {
+ repository := mgr.repository
+ if objectDirectory != "" {
+ var err error
+ repository, err = mgr.repository.Quarantine(objectDirectory)
+ if err != nil {
+ return nil, fmt.Errorf("quarantine: %w", err)
+ }
+ }
+
+ updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions())
if err != nil {
return nil, fmt.Errorf("new: %w", err)
}
@@ -716,9 +950,7 @@ func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context,
// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not
// logged nor applied later.
-func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error {
- nextLogIndex := mgr.appendedLogIndex + 1
-
+func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *gitalypb.LogEntry) error {
if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil {
return fmt.Errorf("set log entry: %w", err)
}
@@ -740,8 +972,11 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
if err != nil {
return fmt.Errorf("read log entry: %w", err)
}
+ if err := mgr.applyPackFile(ctx, logIndex); err != nil {
+ return fmt.Errorf("apply pack file: %w", err)
+ }
- updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates)
+ updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, "")
if err != nil {
return fmt.Errorf("perpare reference transaction: %w", err)
}
@@ -782,6 +1017,28 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return nil
}
+// applyPackFile unpacks the objects from the pack file into the repository if the log entry
+// has an associated pack file.
+func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error {
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return fmt.Errorf("repository path: %w", err)
+ }
+
+ packFile, err := os.Open(filepath.Join(repoPath, "wal", "packs", logIndex.String()+".pack"))
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ // This log entry has no associated pack file.
+ return nil
+ }
+
+ return fmt.Errorf("open pack file: %w", err)
+ }
+ defer packFile.Close()
+
+ return mgr.repository.UnpackObjects(ctx, packFile)
+}
+
// applyCustomHooks applies the custom hooks to the repository from the log entry. The hooks are stored
// at `<repo>/wal/hooks/<log_index>`. The hooks are fsynced prior to returning so it is safe to delete
// the log entry afterwards.
diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go
index ce5899a5b..4f2e893b0 100644
--- a/internal/gitaly/transaction_manager_test.go
+++ b/internal/gitaly/transaction_manager_test.go
@@ -8,6 +8,10 @@ import (
"errors"
"fmt"
"io/fs"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
"sync"
"testing"
"time"
@@ -59,6 +63,78 @@ func validCustomHooks(tb testing.TB) []byte {
return hooks.Bytes()
}
+// writePack writes a pack file and its index into the destination.
+func writePack(tb testing.TB, cfg config.Cfg, packFile []byte, destinationPack string) {
+ tb.Helper()
+
+ gittest.ExecOpts(tb,
+ cfg,
+ gittest.ExecConfig{Stdin: bytes.NewReader(packFile)},
+ "index-pack", "--stdin", destinationPack,
+ )
+}
+
+// buildObjectDirectory builds an object directory that contains the given pack files
+// and returns the path of the directory.
+func buildObjectDirectory(tb testing.TB, cfg config.Cfg, packFiles ...[]byte) string {
+ tb.Helper()
+
+ objectsDirectory := tb.TempDir()
+ packDirectory := filepath.Join(objectsDirectory, "pack")
+
+ require.NoError(tb, os.MkdirAll(packDirectory, fs.ModePerm))
+
+ for i, packFile := range packFiles {
+ writePack(tb, cfg, packFile, filepath.Join(packDirectory, fmt.Sprintf("pack-%d.pack", i)))
+ }
+
+ return objectsDirectory
+}
+
+// packFileDirectoryEntry returns a DirectoryEntry that parses content as a pack file and assert the
+// set of objects in the pack file matches the expected objects.
+func packFileDirectoryEntry(cfg config.Cfg, mode fs.FileMode, expectedObjects []git.ObjectID) testhelper.DirectoryEntry {
+ sortObjects := func(objects []git.ObjectID) {
+ sort.Slice(objects, func(i, j int) bool {
+ return objects[i] < objects[j]
+ })
+ }
+
+ sortObjects(expectedObjects)
+
+ return testhelper.DirectoryEntry{
+ Mode: mode,
+ Content: expectedObjects,
+ ParseContent: func(tb testing.TB, content []byte) any {
+ tb.Helper()
+
+ tempDir := tb.TempDir()
+ packPath := filepath.Join(tempDir, "asserted-pack.pack")
+ writePack(tb, cfg, content, packPath)
+
+ actualObjects := []git.ObjectID{}
+ for _, line := range strings.Split(
+ string(gittest.Exec(tb, testcfg.Build(tb), "verify-pack", "-v", packPath)), "\n",
+ ) {
+ components := strings.Split(line, " ")
+ if len(components) < 2 {
+ continue
+ }
+
+ if strings.HasPrefix(components[1], "commit") ||
+ strings.HasPrefix(components[1], "blob") ||
+ strings.HasPrefix(components[1], "tree") {
+ actualObjects = append(actualObjects, git.ObjectID(components[0]))
+ }
+ }
+
+ sortObjects(actualObjects)
+
+ return actualObjects
+ },
+ }
+}
+
func TestTransactionManager(t *testing.T) {
umask := perm.GetUmask()
@@ -67,7 +143,8 @@ func TestTransactionManager(t *testing.T) {
ctx := testhelper.Context(t)
type testCommit struct {
- OID git.ObjectID
+ OID git.ObjectID
+ Pack []byte
}
type testCommits struct {
@@ -121,15 +198,35 @@ func TestTransactionManager(t *testing.T) {
nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil)))
require.NoError(t, err)
+ packCommit := func(oid git.ObjectID) []byte {
+ t.Helper()
+
+ var pack bytes.Buffer
+ require.NoError(t,
+ localRepo.PackObjects(ctx, strings.NewReader(oid.String()), &pack),
+ )
+
+ return pack.Bytes()
+ }
+
return testSetup{
Config: cfg,
ObjectHash: objectHash,
Repository: localRepo,
NonExistentOID: nonExistentOID,
Commits: testCommits{
- First: testCommit{OID: firstCommitOID},
- Second: testCommit{OID: secondCommitOID},
- Third: testCommit{OID: thirdCommitOID},
+ First: testCommit{
+ OID: firstCommitOID,
+ Pack: packCommit(firstCommitOID),
+ },
+ Second: testCommit{
+ OID: secondCommitOID,
+ Pack: packCommit(secondCommitOID),
+ },
+ Third: testCommit{
+ OID: thirdCommitOID,
+ Pack: packCommit(thirdCommitOID),
+ },
},
}
}
@@ -205,6 +302,8 @@ func TestTransactionManager(t *testing.T) {
SkipVerificationFailures bool
// ReferenceUpdates are the reference updates to commit.
ReferenceUpdates ReferenceUpdates
+ // IncludeObjects is the path to object directory with the objects to include.
+ IncludeObjects string
// DefaultBranchUpdate is the default branch update to commit.
DefaultBranchUpdate *DefaultBranchUpdate
// CustomHooksUpdate is the custom hooks update to commit.
@@ -217,6 +316,12 @@ func TestTransactionManager(t *testing.T) {
TransactionID int
}
+ // Prune prunes all unreferenced objects from the repository.
+ type Prune struct {
+ // ExpectedObjects are the object expected to exist in the repository after pruning.
+ ExpectedObjects []git.ObjectID
+ }
+
// StateAssertions models an assertion of the entire state managed by the TransactionManager.
type StateAssertion struct {
// DefaultBranch is the expected refname that HEAD points to.
@@ -955,6 +1060,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -998,6 +1104,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -1690,6 +1797,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -1702,6 +1810,435 @@ func TestTransactionManager(t *testing.T) {
},
},
},
+ {
+ desc: "pack file includes referenced commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ ExpectedError: updateref.NonExistentObjectError{
+ ReferenceName: "refs/heads/main",
+ ObjectID: setup.Commits.First.OID.String(),
+ },
+ },
+ Begin{
+ TransactionID: 2,
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file includes unreachable objects depended upon",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config,
+ setup.Commits.First.Pack,
+ setup.Commits.Second.Pack,
+ ),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ // Point main to the first commit so the second one is unreachable.
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.Second.OID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ AssertManager{},
+ StopManager{},
+ StartManager{
+ // Crash the manager before the third transaction is applied. This allows us to
+ // prune before it is applied to ensure the pack file contains all necessary commits.
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookContext) {
+ panic(errSimulatedCrash)
+ },
+ },
+ ExpectedError: errSimulatedCrash,
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 2,
+ },
+ },
+ Commit{
+ TransactionID: 3,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Third.Pack),
+ },
+ AssertManager{
+ ExpectedError: errSimulatedCrash,
+ },
+ // Prune so the unreachable commits have been removed prior to the third log entry being
+ // applied.
+ Prune{
+ ExpectedObjects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ StartManager{},
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.Third.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(3).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ setup.Commits.Second.OID,
+ },
+ ),
+ "/wal/packs/3.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.Commits.Second.OID,
+ setup.Commits.Third.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{
+ setup.Commits.First.OID,
+ setup.Commits.Second.OID,
+ setup.Commits.Third.OID,
+ },
+ },
+ },
+ {
+ desc: "pack file reapplying works",
+ steps: steps{
+ Prune{},
+ StartManager{
+ Hooks: testHooks{
+ BeforeStoreAppliedLogIndex: func(hookContext) {
+ panic(errSimulatedCrash)
+ },
+ },
+ ExpectedError: errSimulatedCrash,
+ },
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ AssertManager{
+ ExpectedError: errSimulatedCrash,
+ },
+ StartManager{},
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file missing referenced commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ ExpectedError: localrepo.BadObjectError{ObjectID: setup.Commits.Second.OID},
+ },
+ },
+ expectedState: StateAssertion{
+ Objects: []git.ObjectID{},
+ },
+ },
+ {
+ desc: "pack file missing intermediate commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Third.Pack),
+ ExpectedError: localrepo.ObjectReadError{ObjectID: setup.Commits.Second.OID},
+ },
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file only",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(setup.Config, perm.PrivateFile, []git.ObjectID{}),
+ },
+ Objects: []git.ObjectID{},
+ },
+ },
+ {
+ desc: "pack file with deletions",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Second.Pack),
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ "/wal/packs/2.pack": packFileDirectoryEntry(setup.Config, perm.PrivateFile, []git.ObjectID{}),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file applies with dependency concurrently deleted",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID},
+ },
+ },
+ AssertManager{},
+ Prune{},
+ Commit{
+ TransactionID: 3,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/dependant": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Second.Pack),
+ // The transaction fails to apply as we are not yet maintaining internal references
+ // to the old tips of concurrently deleted references. This causes the prune step to
+ // remove the object this the pack file depends on.
+ //
+ // For now, keep the test case to assert the behavior. We'll fix this in a later MR.
+ ExpectedError: localrepo.ObjectReadError{
+ ObjectID: setup.Commits.First.OID,
+ },
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{},
+ },
+ },
}
type invalidReferenceTestCase struct {
@@ -1861,6 +2398,7 @@ func TestTransactionManager(t *testing.T) {
// openTransactions holds references to all of the transactions that have been
// began in a test case.
openTransactions := map[int]*Transaction{}
+ var objectDirectories []string
// Stop the manager if it is running at the end of the test.
defer func() {
@@ -1943,6 +2481,11 @@ func TestTransactionManager(t *testing.T) {
transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR)
}
+ if step.IncludeObjects != "" {
+ objectDirectories = append(objectDirectories, step.IncludeObjects)
+ transaction.IncludeObjects(step.IncludeObjects)
+ }
+
commitCtx := ctx
if step.Context != nil {
commitCtx = step.Context
@@ -1952,6 +2495,9 @@ func TestTransactionManager(t *testing.T) {
case Rollback:
require.Contains(t, openTransactions, step.TransactionID, "test error: transaction rollbacked before beginning it")
require.NoError(t, openTransactions[step.TransactionID].Rollback())
+ case Prune:
+ gittest.Exec(t, setup.Config, "-C", repoPath, "prune")
+ gittest.RequireObjects(t, setup.Config, repoPath, step.ExpectedObjects)
default:
t.Fatalf("unhandled step type: %T", step)
}
@@ -1972,6 +2518,7 @@ func TestTransactionManager(t *testing.T) {
// gets asserted.
expectedDirectory = testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
}
}
@@ -1986,6 +2533,9 @@ func TestTransactionManager(t *testing.T) {
}
gittest.RequireObjects(t, setup.Config, repoPath, expectedObjects)
+ for _, dir := range objectDirectories {
+ require.NoDirExists(t, dir, "object directory was not cleaned up")
+ }
})
}
}
diff --git a/internal/testhelper/directory.go b/internal/testhelper/directory.go
index 05e650568..8b7befc5f 100644
--- a/internal/testhelper/directory.go
+++ b/internal/testhelper/directory.go
@@ -74,8 +74,8 @@ func RequireDirectoryState(tb testing.TB, rootDirectory, relativeDirectory strin
return nil
}))
- // Create a copy of the expected and set the ParseContent to nil as functions always fail
- // equality checks. We use a copy so we don't unexpectedly modify the original.
+ // Create a copy of the expected state and set the ParseContent to nil as functions always
+ // fail equality checks. We use a copy so we don't unexpectedly modify the original.
expectedCopy := make(DirectoryState, len(expected))
for key, value := range expected {
value.ParseContent = nil