Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-04-05 21:49:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-04-06 21:07:44 +0300
commita32b0389e572a1d35290980314aa03fe90821e7f (patch)
treeec55ec46b6c2a92de27932295e00cd34827c0165
parentc832f426951a3ed6c90bbd6e8a95454ae9ff7480 (diff)
Implement write-ahead logging for objectssmh-log-pack-files-base
All writes must be write-ahead logged prior to being applied to the repository. Objects are currently not being logged, which can be a source of inconsistencies and also performance problems. This commit implements logging support for objects that are needed by a transaction. Objects can be included in a transaction by passing it an object directory. Typically this would be the quarantine directory used in an RPC handler. When the transaction is being committed, the TransactionManager computes a pack file from the new reference tips set in the transaction. The pack file includes objects that are unreachable from the current set of references, so it includes both new objects from the quarantine directory and objects that are already present in the repository but are unreachable. This ensures that the pack file contains all objects that are needed to go from the current set of references to the new set of references after the transaction. This is important as the unreachable objects needed could be otherwise pruned, leading to the pack file no longer applying to the repository. As objects always flow through the log, this also means that only commited objects end up in the repository. This is an important property for backups. The repository will get into a consistent state by applying the write-ahead log. If objects could end up in the repository without being logged, some logged reference changes could fail once a repository is being recovered from a snapshot + log as neither the snapshot nor the log would be guaranteed to include the referenced objects. For replicated setups later, the fact that only committed objects end up in the repository means that all replicas are guaranteed to have received the same objects at some point. If objects from failed writes could end up in the repository, the leader could have a different set of objects from the replicas due to these objects which are not replicated. As the pack files are computed to include also unreachable objects, the pack file is guaranteed to apply on another replica regardless if it has garbage collected the objects. This is a big benefit for replication performance as we don't havae to compute pack files while replicating log entries. The pack files will apply even if the unreachable objects are pruned while they are sitting in the log. However, the current approach is not enough if there are concurrent transactions there is nothing holding on to old tips of references the pack file was computed against. This will be fixed in a follow up by maintaining internal references to the old tips of references until all dependent pack files have been applied.
-rw-r--r--internal/gitaly/transaction_manager.go283
-rw-r--r--internal/gitaly/transaction_manager_test.go558
-rw-r--r--internal/testhelper/directory.go4
3 files changed, 826 insertions, 19 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go
index 3306fed7f..33b493b33 100644
--- a/internal/gitaly/transaction_manager.go
+++ b/internal/gitaly/transaction_manager.go
@@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"fmt"
+ "io"
"io/fs"
"os"
"path/filepath"
@@ -20,8 +21,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v15/internal/safe"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
@@ -121,12 +124,15 @@ type Transaction struct {
// result is where the outcome of the transaction is sent ot by TransactionManager once it
// has been determined.
result chan error
+ // admitted denotes whether the transaction was admitted for processing or not.
+ admitted bool
// Snapshot contains the details of the Transaction's read snapshot.
snapshot Snapshot
skipVerificationFailures bool
referenceUpdates ReferenceUpdates
+ objectDirectory string
defaultBranchUpdate *DefaultBranchUpdate
customHooksUpdate *CustomHooksUpdate
}
@@ -174,12 +180,34 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error)
// Commit performs the changes. If no error is returned, the transaction was successful and the changes
// have been performed. If an error was returned, the transaction may or may not be persisted.
-func (txn *Transaction) Commit(ctx context.Context) error {
+func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) {
+ defer func() {
+ if err := txn.clean(); err != nil && returnedErr == nil {
+ returnedErr = err
+ }
+ }()
+
return txn.commit(ctx, txn)
}
// Rollback releases resources associated with the transaction without performing any changes.
func (txn *Transaction) Rollback() error {
+ return txn.clean()
+}
+
+func (txn *Transaction) clean() error {
+ if txn.admitted {
+ // If the transaction was admitted, the Transaction is being processed by TransactionManager.
+ // The clean up responsibility moves there as well to avoid races.
+ return nil
+ }
+
+ if txn.objectDirectory != "" {
+ if err := os.RemoveAll(txn.objectDirectory); err != nil {
+ return fmt.Errorf("remove object directory: %w", err)
+ }
+ }
+
return nil
}
@@ -205,6 +233,15 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) {
txn.referenceUpdates = updates
}
+// IncludeObjects includes the objects in the given directory in the transaction. The directory should conform
+// to the .git/objects layout and contain the new objects to write as part of the transaction. Typically this
+// points to the quarantine directory used for an RPC. Only objects reachable from the new reference tips are
+// ultimately written into the repository. The caller should not use the object directory anymore after passing
+// it. The object directory is cleaned up in the eventual Rollback or Commit call.
+func (txn *Transaction) IncludeObjects(objectDirectory string) {
+ txn.objectDirectory = objectDirectory
+}
+
// SetDefaultBranch sets the default branch as part of the transaction. If SetDefaultBranch is called
// multiple times, only the changes from the latest invocation take place. The reference is validated
// to exist.
@@ -303,6 +340,8 @@ type repository interface {
ResolveRevision(context.Context, git.Revision) (git.ObjectID, error)
SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error
Path() (string, error)
+ UnpackObjects(context.Context, io.Reader) error
+ Quarantine(string) (*localrepo.Repo, error)
}
// NewTransactionManager returns a new TransactionManager for the given repository.
@@ -329,8 +368,14 @@ type resultChannel chan error
func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error {
transaction.result = make(resultChannel, 1)
+ if err := mgr.packObjects(ctx, transaction); err != nil {
+ return fmt.Errorf("pack objects: %w", err)
+ }
+
select {
case mgr.admissionQueue <- transaction:
+ transaction.admitted = true
+
select {
case err := <-transaction.result:
return unwrapExpectedError(err)
@@ -346,6 +391,75 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact
}
}
+// packObjects packs the objects included in the transaction into a single pack file that is ready
+// for logging. The pack file includes all unreachable objects that are about to be made reachable.
+func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) error {
+ if transaction.objectDirectory == "" {
+ return nil
+ }
+
+ quarantinedRepo, err := mgr.repository.Quarantine(transaction.objectDirectory)
+ if err != nil {
+ return fmt.Errorf("quarantine: %w", err)
+ }
+
+ objectsReader, objectsWriter := io.Pipe()
+
+ group, ctx := errgroup.WithContext(ctx)
+ group.Go(func() (returnedErr error) {
+ defer func() { objectsWriter.CloseWithError(returnedErr) }()
+
+ objectHash, err := quarantinedRepo.ObjectHash(ctx)
+ if err != nil {
+ return fmt.Errorf("object hash: %w", err)
+ }
+
+ heads := make([]string, 0, len(transaction.referenceUpdates))
+ for _, update := range transaction.referenceUpdates {
+ if update.NewOID == objectHash.ZeroOID {
+ // Reference deletions can't introduce new objects so ignore them.
+ continue
+ }
+
+ heads = append(heads, update.NewOID.String())
+ }
+
+ if err := quarantinedRepo.WalkUnreachableObjects(ctx,
+ strings.NewReader(strings.Join(heads, "\n")),
+ objectsWriter,
+ ); err != nil {
+ return fmt.Errorf("walk objects: %w", err)
+ }
+
+ return nil
+ })
+
+ group.Go(func() (returnedErr error) {
+ defer func() { objectsReader.CloseWithError(returnedErr) }()
+
+ destinationFile, err := os.OpenFile(
+ filepath.Join(transaction.objectDirectory, "transaction.pack"),
+ os.O_WRONLY|os.O_CREATE|os.O_EXCL,
+ perm.PrivateFile,
+ )
+ if err != nil {
+ return fmt.Errorf("open file: %w", err)
+ }
+ defer destinationFile.Close()
+
+ // TODO: This currently would include unreachable objects in the pool for forks which leads to
+ // excessive writes. Is that a problem?
+ // TODO: Ignore internal references
+ if err := quarantinedRepo.PackObjects(ctx, objectsReader, destinationFile); err != nil {
+ return fmt.Errorf("pack objects: %w", err)
+ }
+
+ return destinationFile.Close()
+ })
+
+ return group.Wait()
+}
+
// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller.
func unwrapExpectedError(err error) error {
// The manager controls its own execution context and it is canceled only when Stop is called.
@@ -401,10 +515,32 @@ func (mgr *TransactionManager) Run() (returnedErr error) {
// processTransaction waits for a transaction and processes it by verifying and
// logging it.
-func (mgr *TransactionManager) processTransaction() error {
+func (mgr *TransactionManager) processTransaction() (returnedErr error) {
+ var cleanUps []func() error
+ defer func() {
+ for _, cleanUp := range cleanUps {
+ if err := cleanUp(); err != nil && returnedErr == nil {
+ returnedErr = fmt.Errorf("clean up: %w", err)
+ }
+ }
+ }()
+
var transaction *Transaction
select {
case transaction = <-mgr.admissionQueue:
+ if transaction.objectDirectory != "" {
+ // The Transaction does not clean up the object directory anymore once the Transaction
+ // has been admitted. This avoids the Transaction concurrently removing the directory
+ // while the manager is still operating on it. We thus need to defer the clean up of
+ // the directory.
+ cleanUps = append(cleanUps, func() error {
+ if err := os.RemoveAll(transaction.objectDirectory); err != nil {
+ return fmt.Errorf("remove object directory: %w", err)
+ }
+
+ return nil
+ })
+ }
case <-mgr.ctx.Done():
}
@@ -414,7 +550,7 @@ func (mgr *TransactionManager) processTransaction() error {
return err
}
- transaction.result <- func() error {
+ transaction.result <- func() (commitErr error) {
logEntry, err := mgr.verifyReferences(mgr.ctx, transaction)
if err != nil {
return fmt.Errorf("verify references: %w", err)
@@ -425,8 +561,29 @@ func (mgr *TransactionManager) processTransaction() error {
CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR,
}
}
+ nextLogIndex := mgr.appendedLogIndex + 1
+
+ if transaction.objectDirectory != "" {
+ removePack, err := mgr.storePackFile(mgr.ctx, nextLogIndex, transaction)
+ cleanUps = append(cleanUps, func() error {
+ // The transaction's pack file might have been moved in to place at <repo>/wal/packs/<log_index>.pack.
+ // If anything fails before the transaction is committed, the pack file must be removed as otherwise
+ // it would occupy the pack file slot of the next log entry. If this can't be done, the TransactionManager
+ // will exit with an error. The pack file will be cleaned up on restart and no further processing is
+ // allowed until that happens.
+ if commitErr != nil {
+ return removePack()
+ }
+
+ return nil
+ })
- return mgr.appendLogEntry(logEntry)
+ if err != nil {
+ return fmt.Errorf("store pack file: %w", err)
+ }
+ }
+
+ return mgr.appendLogEntry(nextLogIndex, logEntry)
}()
return nil
@@ -526,6 +683,10 @@ func (mgr *TransactionManager) initialize() error {
mgr.applyNotifications[i] = make(chan struct{})
}
+ if err := mgr.removeStalePackFiles(mgr.ctx, mgr.appendedLogIndex); err != nil {
+ return fmt.Errorf("remove stale packs: %w", err)
+ }
+
return nil
}
@@ -540,6 +701,7 @@ func (mgr *TransactionManager) createDirectories() error {
for _, relativePath := range []string{
"wal/hooks",
+ "wal/packs",
} {
directory := filepath.Join(repoPath, relativePath)
if _, err := os.Stat(directory); err != nil {
@@ -560,6 +722,69 @@ func (mgr *TransactionManager) createDirectories() error {
return nil
}
+// removeStalePackFiles removes pack files from the log directory that have no associated log entry.
+// Such packs can be left around if a transaction's pack file was moved in place succesfully
+// but the manager was interrupted before successfully persisting the log entry itself.
+func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error {
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return fmt.Errorf("repo path: %w", err)
+ }
+
+ // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale
+ // pack would be for the next log index. Remove the pack if it exists.
+ possibleStalePackIndex := appendedIndex + 1
+ packsDirectory := filepath.Join(repoPath, "wal", "packs")
+ possibleStalePack := possibleStalePackIndex.String() + ".pack"
+
+ if err := os.Remove(filepath.Join(packsDirectory, possibleStalePack)); err != nil {
+ if !errors.Is(err, fs.ErrNotExist) {
+ return fmt.Errorf("remove: %w", err)
+ }
+ } else {
+ // Sync the parent directory to flush the file deletion.
+ if err := safe.NewSyncer().SyncHierarchy(packsDirectory, possibleStalePack); err != nil {
+ return fmt.Errorf("sync: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) {
+ removePack := func() error { return nil }
+
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return removePack, fmt.Errorf("repository path: %w", err)
+ }
+
+ packsDirectory := filepath.Join(repoPath, "wal", "packs")
+ packFile := index.String() + ".pack"
+ destinationPath := filepath.Join(packsDirectory, packFile)
+ if err := os.Rename(
+ filepath.Join(transaction.objectDirectory, "transaction.pack"),
+ destinationPath,
+ ); err != nil {
+ return removePack, fmt.Errorf("move pack file: %w", err)
+ }
+
+ removePack = func() error {
+ if err := os.Remove(destinationPath); err != nil {
+ return fmt.Errorf("remove pack file: %w", err)
+ }
+
+ return nil
+ }
+
+ // Sync the parent directory and the pack itself.
+ if err := safe.NewSyncer().SyncHierarchy(packsDirectory, packFile); err != nil {
+ return removePack, fmt.Errorf("sync: %w", err)
+ }
+
+ return removePack, nil
+}
+
// verifyReferences verifies that the references in the transaction apply on top of the already accepted
// reference changes. The old tips in the transaction are verified against the current actual tips.
// It returns the write-ahead log entry for the transaction if it was successfully verified.
@@ -619,7 +844,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
) == -1
})
- if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil {
+ if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates, transaction.objectDirectory); err != nil {
return nil, fmt.Errorf("verify references with git: %w", err)
}
@@ -640,8 +865,8 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
// the updates will go through when they are being applied in the log. This also catches any invalid reference names
// and file/directory conflicts with Git's loose reference storage which can occur with references like
// 'refs/heads/parent' and 'refs/heads/parent/child'.
-func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error {
- updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates)
+func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, objectDirectory string) error {
+ updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, objectDirectory)
if err != nil {
return fmt.Errorf("prepare reference transaction: %w", err)
}
@@ -691,8 +916,17 @@ func (mgr *TransactionManager) updateDefaultBranch(ctx context.Context, defaultB
// prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing
// or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up
// if an error is returned.
-func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) {
- updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions())
+func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, objectDirectory string) (*updateref.Updater, error) {
+ repository := mgr.repository
+ if objectDirectory != "" {
+ var err error
+ repository, err = mgr.repository.Quarantine(objectDirectory)
+ if err != nil {
+ return nil, fmt.Errorf("quarantine: %w", err)
+ }
+ }
+
+ updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions())
if err != nil {
return nil, fmt.Errorf("new: %w", err)
}
@@ -716,9 +950,7 @@ func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context,
// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not
// logged nor applied later.
-func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error {
- nextLogIndex := mgr.appendedLogIndex + 1
-
+func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *gitalypb.LogEntry) error {
if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil {
return fmt.Errorf("set log entry: %w", err)
}
@@ -740,8 +972,11 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
if err != nil {
return fmt.Errorf("read log entry: %w", err)
}
+ if err := mgr.applyPackFile(ctx, logIndex); err != nil {
+ return fmt.Errorf("apply pack file: %w", err)
+ }
- updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates)
+ updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, "")
if err != nil {
return fmt.Errorf("perpare reference transaction: %w", err)
}
@@ -782,6 +1017,28 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return nil
}
+// applyPackFile unpacks the objects from the pack file into the repository if the log entry
+// has an associated pack file.
+func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error {
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return fmt.Errorf("repository path: %w", err)
+ }
+
+ packFile, err := os.Open(filepath.Join(repoPath, "wal", "packs", logIndex.String()+".pack"))
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ // This log entry has no associated pack file.
+ return nil
+ }
+
+ return fmt.Errorf("open pack file: %w", err)
+ }
+ defer packFile.Close()
+
+ return mgr.repository.UnpackObjects(ctx, packFile)
+}
+
// applyCustomHooks applies the custom hooks to the repository from the log entry. The hooks are stored
// at `<repo>/wal/hooks/<log_index>`. The hooks are fsynced prior to returning so it is safe to delete
// the log entry afterwards.
diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go
index ce5899a5b..4f2e893b0 100644
--- a/internal/gitaly/transaction_manager_test.go
+++ b/internal/gitaly/transaction_manager_test.go
@@ -8,6 +8,10 @@ import (
"errors"
"fmt"
"io/fs"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
"sync"
"testing"
"time"
@@ -59,6 +63,78 @@ func validCustomHooks(tb testing.TB) []byte {
return hooks.Bytes()
}
+// writePack writes a pack file and its index into the destination.
+func writePack(tb testing.TB, cfg config.Cfg, packFile []byte, destinationPack string) {
+ tb.Helper()
+
+ gittest.ExecOpts(tb,
+ cfg,
+ gittest.ExecConfig{Stdin: bytes.NewReader(packFile)},
+ "index-pack", "--stdin", destinationPack,
+ )
+}
+
+// buildObjectDirectory builds an object directory that contains the given pack files
+// and returns the path of the directory.
+func buildObjectDirectory(tb testing.TB, cfg config.Cfg, packFiles ...[]byte) string {
+ tb.Helper()
+
+ objectsDirectory := tb.TempDir()
+ packDirectory := filepath.Join(objectsDirectory, "pack")
+
+ require.NoError(tb, os.MkdirAll(packDirectory, fs.ModePerm))
+
+ for i, packFile := range packFiles {
+ writePack(tb, cfg, packFile, filepath.Join(packDirectory, fmt.Sprintf("pack-%d.pack", i)))
+ }
+
+ return objectsDirectory
+}
+
+// packFileDirectoryEntry returns a DirectoryEntry that parses content as a pack file and assert the
+// set of objects in the pack file matches the expected objects.
+func packFileDirectoryEntry(cfg config.Cfg, mode fs.FileMode, expectedObjects []git.ObjectID) testhelper.DirectoryEntry {
+ sortObjects := func(objects []git.ObjectID) {
+ sort.Slice(objects, func(i, j int) bool {
+ return objects[i] < objects[j]
+ })
+ }
+
+ sortObjects(expectedObjects)
+
+ return testhelper.DirectoryEntry{
+ Mode: mode,
+ Content: expectedObjects,
+ ParseContent: func(tb testing.TB, content []byte) any {
+ tb.Helper()
+
+ tempDir := tb.TempDir()
+ packPath := filepath.Join(tempDir, "asserted-pack.pack")
+ writePack(tb, cfg, content, packPath)
+
+ actualObjects := []git.ObjectID{}
+ for _, line := range strings.Split(
+ string(gittest.Exec(tb, testcfg.Build(tb), "verify-pack", "-v", packPath)), "\n",
+ ) {
+ components := strings.Split(line, " ")
+ if len(components) < 2 {
+ continue
+ }
+
+ if strings.HasPrefix(components[1], "commit") ||
+ strings.HasPrefix(components[1], "blob") ||
+ strings.HasPrefix(components[1], "tree") {
+ actualObjects = append(actualObjects, git.ObjectID(components[0]))
+ }
+ }
+
+ sortObjects(actualObjects)
+
+ return actualObjects
+ },
+ }
+}
+
func TestTransactionManager(t *testing.T) {
umask := perm.GetUmask()
@@ -67,7 +143,8 @@ func TestTransactionManager(t *testing.T) {
ctx := testhelper.Context(t)
type testCommit struct {
- OID git.ObjectID
+ OID git.ObjectID
+ Pack []byte
}
type testCommits struct {
@@ -121,15 +198,35 @@ func TestTransactionManager(t *testing.T) {
nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil)))
require.NoError(t, err)
+ packCommit := func(oid git.ObjectID) []byte {
+ t.Helper()
+
+ var pack bytes.Buffer
+ require.NoError(t,
+ localRepo.PackObjects(ctx, strings.NewReader(oid.String()), &pack),
+ )
+
+ return pack.Bytes()
+ }
+
return testSetup{
Config: cfg,
ObjectHash: objectHash,
Repository: localRepo,
NonExistentOID: nonExistentOID,
Commits: testCommits{
- First: testCommit{OID: firstCommitOID},
- Second: testCommit{OID: secondCommitOID},
- Third: testCommit{OID: thirdCommitOID},
+ First: testCommit{
+ OID: firstCommitOID,
+ Pack: packCommit(firstCommitOID),
+ },
+ Second: testCommit{
+ OID: secondCommitOID,
+ Pack: packCommit(secondCommitOID),
+ },
+ Third: testCommit{
+ OID: thirdCommitOID,
+ Pack: packCommit(thirdCommitOID),
+ },
},
}
}
@@ -205,6 +302,8 @@ func TestTransactionManager(t *testing.T) {
SkipVerificationFailures bool
// ReferenceUpdates are the reference updates to commit.
ReferenceUpdates ReferenceUpdates
+ // IncludeObjects is the path to object directory with the objects to include.
+ IncludeObjects string
// DefaultBranchUpdate is the default branch update to commit.
DefaultBranchUpdate *DefaultBranchUpdate
// CustomHooksUpdate is the custom hooks update to commit.
@@ -217,6 +316,12 @@ func TestTransactionManager(t *testing.T) {
TransactionID int
}
+ // Prune prunes all unreferenced objects from the repository.
+ type Prune struct {
+ // ExpectedObjects are the object expected to exist in the repository after pruning.
+ ExpectedObjects []git.ObjectID
+ }
+
// StateAssertions models an assertion of the entire state managed by the TransactionManager.
type StateAssertion struct {
// DefaultBranch is the expected refname that HEAD points to.
@@ -955,6 +1060,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -998,6 +1104,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -1690,6 +1797,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -1702,6 +1810,435 @@ func TestTransactionManager(t *testing.T) {
},
},
},
+ {
+ desc: "pack file includes referenced commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ ExpectedError: updateref.NonExistentObjectError{
+ ReferenceName: "refs/heads/main",
+ ObjectID: setup.Commits.First.OID.String(),
+ },
+ },
+ Begin{
+ TransactionID: 2,
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file includes unreachable objects depended upon",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config,
+ setup.Commits.First.Pack,
+ setup.Commits.Second.Pack,
+ ),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ // Point main to the first commit so the second one is unreachable.
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.Second.OID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ AssertManager{},
+ StopManager{},
+ StartManager{
+ // Crash the manager before the third transaction is applied. This allows us to
+ // prune before it is applied to ensure the pack file contains all necessary commits.
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookContext) {
+ panic(errSimulatedCrash)
+ },
+ },
+ ExpectedError: errSimulatedCrash,
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 2,
+ },
+ },
+ Commit{
+ TransactionID: 3,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Third.Pack),
+ },
+ AssertManager{
+ ExpectedError: errSimulatedCrash,
+ },
+ // Prune so the unreachable commits have been removed prior to the third log entry being
+ // applied.
+ Prune{
+ ExpectedObjects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ StartManager{},
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.Third.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(3).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ setup.Commits.Second.OID,
+ },
+ ),
+ "/wal/packs/3.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.Commits.Second.OID,
+ setup.Commits.Third.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{
+ setup.Commits.First.OID,
+ setup.Commits.Second.OID,
+ setup.Commits.Third.OID,
+ },
+ },
+ },
+ {
+ desc: "pack file reapplying works",
+ steps: steps{
+ Prune{},
+ StartManager{
+ Hooks: testHooks{
+ BeforeStoreAppliedLogIndex: func(hookContext) {
+ panic(errSimulatedCrash)
+ },
+ },
+ ExpectedError: errSimulatedCrash,
+ },
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ AssertManager{
+ ExpectedError: errSimulatedCrash,
+ },
+ StartManager{},
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file missing referenced commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ ExpectedError: localrepo.BadObjectError{ObjectID: setup.Commits.Second.OID},
+ },
+ },
+ expectedState: StateAssertion{
+ Objects: []git.ObjectID{},
+ },
+ },
+ {
+ desc: "pack file missing intermediate commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Third.Pack),
+ ExpectedError: localrepo.ObjectReadError{ObjectID: setup.Commits.Second.OID},
+ },
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file only",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(setup.Config, perm.PrivateFile, []git.ObjectID{}),
+ },
+ Objects: []git.ObjectID{},
+ },
+ },
+ {
+ desc: "pack file with deletions",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Second.Pack),
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ "/wal/packs/2.pack": packFileDirectoryEntry(setup.Config, perm.PrivateFile, []git.ObjectID{}),
+ },
+ Objects: []git.ObjectID{setup.Commits.First.OID},
+ },
+ },
+ {
+ desc: "pack file applies with dependency concurrently deleted",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.First.Pack),
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID},
+ },
+ },
+ AssertManager{},
+ Prune{},
+ Commit{
+ TransactionID: 3,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/dependant": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ IncludeObjects: buildObjectDirectory(t, setup.Config, setup.Commits.Second.Pack),
+ // The transaction fails to apply as we are not yet maintaining internal references
+ // to the old tips of concurrently deleted references. This causes the prune step to
+ // remove the object this the pack file depends on.
+ //
+ // For now, keep the test case to assert the behavior. We'll fix this in a later MR.
+ ExpectedError: localrepo.ObjectReadError{
+ ObjectID: setup.Commits.First.OID,
+ },
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{},
+ },
+ },
}
type invalidReferenceTestCase struct {
@@ -1861,6 +2398,7 @@ func TestTransactionManager(t *testing.T) {
// openTransactions holds references to all of the transactions that have been
// began in a test case.
openTransactions := map[int]*Transaction{}
+ var objectDirectories []string
// Stop the manager if it is running at the end of the test.
defer func() {
@@ -1943,6 +2481,11 @@ func TestTransactionManager(t *testing.T) {
transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR)
}
+ if step.IncludeObjects != "" {
+ objectDirectories = append(objectDirectories, step.IncludeObjects)
+ transaction.IncludeObjects(step.IncludeObjects)
+ }
+
commitCtx := ctx
if step.Context != nil {
commitCtx = step.Context
@@ -1952,6 +2495,9 @@ func TestTransactionManager(t *testing.T) {
case Rollback:
require.Contains(t, openTransactions, step.TransactionID, "test error: transaction rollbacked before beginning it")
require.NoError(t, openTransactions[step.TransactionID].Rollback())
+ case Prune:
+ gittest.Exec(t, setup.Config, "-C", repoPath, "prune")
+ gittest.RequireObjects(t, setup.Config, repoPath, step.ExpectedObjects)
default:
t.Fatalf("unhandled step type: %T", step)
}
@@ -1972,6 +2518,7 @@ func TestTransactionManager(t *testing.T) {
// gets asserted.
expectedDirectory = testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
}
}
@@ -1986,6 +2533,9 @@ func TestTransactionManager(t *testing.T) {
}
gittest.RequireObjects(t, setup.Config, repoPath, expectedObjects)
+ for _, dir := range objectDirectories {
+ require.NoDirExists(t, dir, "object directory was not cleaned up")
+ }
})
}
}
diff --git a/internal/testhelper/directory.go b/internal/testhelper/directory.go
index 05e650568..8b7befc5f 100644
--- a/internal/testhelper/directory.go
+++ b/internal/testhelper/directory.go
@@ -74,8 +74,8 @@ func RequireDirectoryState(tb testing.TB, rootDirectory, relativeDirectory strin
return nil
}))
- // Create a copy of the expected and set the ParseContent to nil as functions always fail
- // equality checks. We use a copy so we don't unexpectedly modify the original.
+ // Create a copy of the expected state and set the ParseContent to nil as functions always
+ // fail equality checks. We use a copy so we don't unexpectedly modify the original.
expectedCopy := make(DirectoryState, len(expected))
for key, value := range expected {
value.ParseContent = nil