Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-05-10 13:13:40 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-05-10 13:13:40 +0300
commit46ee0248c36241d856710030958fa5e1e04b179e (patch)
tree68e546afe967d4e6939a53a8ba661b83f40ce34d
parent242c75f51dd840799467c41b0fe15a2214ae07c2 (diff)
parent04a5d55597c65ea5f4755b560018065c8de341f8 (diff)
Merge branch 'smh-log-pack-files' into 'master'qmnguyen0711/always-log-queue-ms
Implement write-ahead logging for objects Closes #4790 See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5638 Merged-by: Patrick Steinhardt <psteinhardt@gitlab.com> Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--internal/gitaly/partition_manager.go21
-rw-r--r--internal/gitaly/partition_manager_test.go19
-rw-r--r--internal/gitaly/transaction_manager.go385
-rw-r--r--internal/gitaly/transaction_manager_test.go600
-rw-r--r--proto/go/gitalypb/log.pb.go56
-rw-r--r--proto/log.proto3
6 files changed, 1019 insertions, 65 deletions
diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go
index 73a7ca97e..a07661096 100644
--- a/internal/gitaly/partition_manager.go
+++ b/internal/gitaly/partition_manager.go
@@ -3,6 +3,8 @@ package gitaly
import (
"context"
"errors"
+ "fmt"
+ "os"
"sync"
"github.com/dgraph-io/badger/v3"
@@ -33,6 +35,9 @@ type PartitionManager struct {
stopped bool
// partitionsWG keeps track of running partitions.
partitionsWG sync.WaitGroup
+ // stagingDirectory is the directory where all of the TransactionManager staging directories
+ // should be created.
+ stagingDirectory string
}
// partition contains the transaction manager and tracks the number of in-flight transactions for the partition.
@@ -50,12 +55,13 @@ type partition struct {
}
// NewPartitionManager returns a new PartitionManager.
-func NewPartitionManager(db *badger.DB, localRepoFactory func(repo.GitRepo) *localrepo.Repo, logger logrus.FieldLogger) *PartitionManager {
+func NewPartitionManager(db *badger.DB, localRepoFactory func(repo.GitRepo) *localrepo.Repo, logger logrus.FieldLogger, stagingDir string) *PartitionManager {
return &PartitionManager{
db: db,
partitions: make(map[string]*partition),
localRepoFactory: localRepoFactory,
logger: logger,
+ stagingDirectory: stagingDir,
}
}
@@ -79,7 +85,13 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran
shutdown: make(chan struct{}),
}
- mgr := NewTransactionManager(pm.db, localRepo, pm.transactionFinalizerFactory(ptn))
+ stagingDir, err := os.MkdirTemp(pm.stagingDirectory, "")
+ if err != nil {
+ pm.mu.Unlock()
+ return nil, fmt.Errorf("create staging directory: %w", err)
+ }
+
+ mgr := NewTransactionManager(pm.db, localRepo, stagingDir, pm.transactionFinalizerFactory(ptn))
ptn.transactionManager = mgr
pm.partitions[partitionKey] = ptn
@@ -99,6 +111,11 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran
pm.mu.Unlock()
close(ptn.shutdown)
+
+ if err := os.RemoveAll(stagingDir); err != nil {
+ pm.logger.WithError(err).Error("failed removing partition's staging directory")
+ }
+
pm.partitionsWG.Done()
}()
}
diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go
index 288bd3ca2..a3dc27b9e 100644
--- a/internal/gitaly/partition_manager_test.go
+++ b/internal/gitaly/partition_manager_test.go
@@ -2,6 +2,9 @@ package gitaly
import (
"context"
+ "io/fs"
+ "os"
+ "path/filepath"
"testing"
"github.com/sirupsen/logrus"
@@ -12,11 +15,14 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/localrepo"
repo "gitlab.com/gitlab-org/gitaly/v15/internal/git/repository"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
)
func TestPartitionManager(t *testing.T) {
+ umask := perm.GetUmask()
+
t.Parallel()
ctx := testhelper.Context(t)
@@ -431,8 +437,17 @@ func TestPartitionManager(t *testing.T) {
require.NoError(t, err)
defer testhelper.MustClose(t, database)
- partitionManager := NewPartitionManager(database, localRepoFactory, logrus.StandardLogger())
- defer partitionManager.Stop()
+ stagingDir := filepath.Join(t.TempDir(), "staging")
+ require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir))
+
+ partitionManager := NewPartitionManager(database, localRepoFactory, logrus.StandardLogger(), stagingDir)
+ defer func() {
+ partitionManager.Stop()
+ // Assert all staging directories have been removed.
+ testhelper.RequireDirectoryState(t, stagingDir, "", testhelper.DirectoryState{
+ "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ })
+ }()
// openTransactionData holds references to all transactions and its associated partition
// created during the testcase.
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go
index 018f5ea8d..e220d5edd 100644
--- a/internal/gitaly/transaction_manager.go
+++ b/internal/gitaly/transaction_manager.go
@@ -6,6 +6,7 @@ import (
"encoding/binary"
"errors"
"fmt"
+ "io"
"io/fs"
"os"
"path/filepath"
@@ -21,8 +22,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v15/internal/git/updateref"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/repoutil"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v15/internal/safe"
"gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
@@ -119,11 +122,30 @@ type Snapshot struct {
type Transaction struct {
// commit commits the Transaction through the TransactionManager.
commit func(context.Context, *Transaction) error
- // rollback rolls back the Transaction through the TransactionManager.
- rollback func() error
+ // finalize decrements the active transaction count on the partition in the PartitionManager. This is
+ // really only a concern for the PartitionManager and will be moved out from here later.
+ finalize func()
// result is where the outcome of the transaction is sent ot by TransactionManager once it
// has been determined.
result chan error
+ // admitted denotes whether the transaction was admitted for processing in the TransactionManager.
+ // Transaction queues in admissionQueue to be committed, and is considered admitted once it has
+ // been dequeued by TransactionManager.Run(). Once the transaction is admitted, its ownership moves
+ // from the client goroutine to the TransactionManager.Run() goroutine, and the client goroutine must
+ // not do any modifications to the state of the transcation anymore to avoid races.
+ admitted bool
+ // initStagingDirectory is called to lazily initialize the staging directory when it is
+ // needed.
+ initStagingDirectory func() error
+ // stagingDirectory is the directory where the transaction stages its files prior
+ // to them being logged. It is cleaned up when the transaction finishes.
+ stagingDirectory string
+ // quarantineDirectory is the directory within the stagingDirectory where the new objects of the
+ // transaction are quarantined.
+ quarantineDirectory string
+ // includesPack is set if a pack file has been computed for the transaction and should be
+ // logged.
+ includesPack bool
// Snapshot contains the details of the Transaction's read snapshot.
snapshot Snapshot
@@ -149,11 +171,16 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error)
}
mgr.mutex.RLock()
- snapshot := Snapshot{
- ReadIndex: mgr.appendedLogIndex,
- HookIndex: mgr.hookIndex,
+ txn := &Transaction{
+ commit: mgr.commit,
+ finalize: mgr.transactionFinalizer,
+ snapshot: Snapshot{
+ ReadIndex: mgr.appendedLogIndex,
+ HookIndex: mgr.hookIndex,
+ },
}
- readReady := mgr.applyNotifications[snapshot.ReadIndex]
+
+ readReady := mgr.applyNotifications[txn.snapshot.ReadIndex]
mgr.mutex.RUnlock()
if readReady == nil {
// The snapshot log entry is already applied if there is no notification channel for it.
@@ -162,29 +189,66 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error)
close(readReady)
}
+ txn.initStagingDirectory = func() error {
+ stagingDirectory, err := os.MkdirTemp(mgr.stagingDirectory, "")
+ if err != nil {
+ return fmt.Errorf("mkdir temp: %w", err)
+ }
+
+ txn.stagingDirectory = stagingDirectory
+ return nil
+ }
+
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-mgr.ctx.Done():
return nil, ErrTransactionProcessingStopped
case <-readReady:
- return &Transaction{
- commit: mgr.commit,
- rollback: mgr.rollback,
- snapshot: snapshot,
- }, nil
+ return txn, nil
}
}
// Commit performs the changes. If no error is returned, the transaction was successful and the changes
// have been performed. If an error was returned, the transaction may or may not be persisted.
-func (txn *Transaction) Commit(ctx context.Context) error {
+func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) {
+ defer func() {
+ txn.finalize()
+
+ if err := txn.cleanUnadmitted(); err != nil && returnedErr == nil {
+ returnedErr = err
+ }
+ }()
+
return txn.commit(ctx, txn)
}
// Rollback releases resources associated with the transaction without performing any changes.
func (txn *Transaction) Rollback() error {
- return txn.rollback()
+ defer txn.finalize()
+ return txn.cleanUnadmitted()
+}
+
+// cleanUnadmitted cleans up after the transaction if it wasn't yet admitted. If the transaction was admitted,
+// the Transaction is being processed by TransactionManager. The clean up responsibility moves there as well
+// to avoid races.
+func (txn *Transaction) cleanUnadmitted() error {
+ if txn.admitted {
+ return nil
+ }
+
+ return txn.clean()
+}
+
+// clean cleans up the resources associated with the transaction.
+func (txn *Transaction) clean() error {
+ if txn.stagingDirectory != "" {
+ if err := os.RemoveAll(txn.stagingDirectory); err != nil {
+ return fmt.Errorf("remove staging directory: %w", err)
+ }
+ }
+
+ return nil
}
// Snapshot returns the details of the Transaction's read snapshot.
@@ -209,6 +273,24 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) {
txn.referenceUpdates = updates
}
+// QuarantineDirectory returns an absolute path to the transaction's quarantine directory. The quarantine directory
+// is a Git object directory where the new objects introduced in the transaction must be written. The quarantined
+// objects needed by the updated reference tips will be included in the transaction.
+func (txn *Transaction) QuarantineDirectory() (string, error) {
+ if err := txn.initStagingDirectory(); err != nil {
+ return "", fmt.Errorf("init staging directory: %w", err)
+ }
+
+ quarantineDirectory := filepath.Join(txn.stagingDirectory, "quarantine")
+ if err := os.MkdirAll(filepath.Join(quarantineDirectory, "pack"), perm.PrivateDir); err != nil {
+ return "", fmt.Errorf("create quarantine directory: %w", err)
+ }
+
+ txn.quarantineDirectory = quarantineDirectory
+
+ return quarantineDirectory, nil
+}
+
// SetDefaultBranch sets the default branch as part of the transaction. If SetDefaultBranch is called
// multiple times, only the changes from the latest invocation take place. The reference is validated
// to exist.
@@ -223,6 +305,11 @@ func (txn *Transaction) SetCustomHooks(hooksTAR []byte) {
txn.customHooksUpdate = &CustomHooksUpdate{CustomHooksTAR: hooksTAR}
}
+// packFilePath returns the path to this transaction's pack file.
+func (txn *Transaction) packFilePath() string {
+ return filepath.Join(txn.stagingDirectory, "transaction.pack")
+}
+
// TransactionManager is responsible for transaction management of a single repository. Each repository has
// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from
// the admissionQueue. Each admitted write is processed in three steps:
@@ -275,6 +362,11 @@ type TransactionManager struct {
// being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly
// releases awaiters when the transactions processing is stopped.
runDone chan struct{}
+ // stagingDirectory is a path to a directory where this TransactionManager should stage the files of the transactions
+ // before it logs them. The TransactionManager cleans up the files during runtime but stale files may be
+ // left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when
+ // Gitaly starts.
+ stagingDirectory string
// repository is the repository this TransactionManager is acting on.
repository repository
@@ -310,10 +402,14 @@ type repository interface {
ResolveRevision(context.Context, git.Revision) (git.ObjectID, error)
SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error
Path() (string, error)
+ UnpackObjects(context.Context, io.Reader) error
+ Quarantine(string) (*localrepo.Repo, error)
+ WalkUnreachableObjects(context.Context, io.Reader, io.Writer) error
+ PackObjects(context.Context, io.Reader, io.Writer) error
}
// NewTransactionManager returns a new TransactionManager for the given repository.
-func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, transactionFinalizer func()) *TransactionManager {
+func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, stagingDir string, transactionFinalizer func()) *TransactionManager {
ctx, cancel := context.WithCancel(context.Background())
return &TransactionManager{
ctx: ctx,
@@ -325,6 +421,7 @@ func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, transactio
admissionQueue: make(chan *Transaction),
initialized: make(chan struct{}),
applyNotifications: make(map[LogIndex]chan struct{}),
+ stagingDirectory: stagingDir,
transactionFinalizer: transactionFinalizer,
}
}
@@ -335,12 +432,16 @@ type resultChannel chan error
// commit queues the transaction for processing and returns once the result has been determined.
func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transaction) error {
- defer mgr.transactionFinalizer()
-
transaction.result = make(resultChannel, 1)
+ if err := mgr.packObjects(ctx, transaction); err != nil {
+ return fmt.Errorf("pack objects: %w", err)
+ }
+
select {
case mgr.admissionQueue <- transaction:
+ transaction.admitted = true
+
select {
case err := <-transaction.result:
return unwrapExpectedError(err)
@@ -356,11 +457,83 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact
}
}
-// rollback rolls back and ends the transaction without committing.
-func (mgr *TransactionManager) rollback() error {
- mgr.transactionFinalizer()
+// packObjects packs the objects included in the transaction into a single pack file that is ready
+// for logging. The pack file includes all unreachable objects that are about to be made reachable.
+func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) error {
+ if transaction.quarantineDirectory == "" {
+ return nil
+ }
- return nil
+ quarantinedRepo, err := mgr.repository.Quarantine(transaction.quarantineDirectory)
+ if err != nil {
+ return fmt.Errorf("quarantine: %w", err)
+ }
+
+ objectHash, err := quarantinedRepo.ObjectHash(ctx)
+ if err != nil {
+ return fmt.Errorf("object hash: %w", err)
+ }
+
+ heads := make([]string, 0, len(transaction.referenceUpdates))
+ for _, update := range transaction.referenceUpdates {
+ if update.NewOID == objectHash.ZeroOID {
+ // Reference deletions can't introduce new objects so ignore them.
+ continue
+ }
+
+ heads = append(heads, update.NewOID.String())
+ }
+
+ if len(heads) == 0 {
+ // No need to pack objects if there are no changes that can introduce new objects.
+ return nil
+ }
+
+ transaction.includesPack = true
+
+ objectsReader, objectsWriter := io.Pipe()
+
+ group, ctx := errgroup.WithContext(ctx)
+ group.Go(func() (returnedErr error) {
+ defer func() { objectsWriter.CloseWithError(returnedErr) }()
+
+ if err := quarantinedRepo.WalkUnreachableObjects(ctx,
+ strings.NewReader(strings.Join(heads, "\n")),
+ objectsWriter,
+ ); err != nil {
+ return fmt.Errorf("walk objects: %w", err)
+ }
+
+ return nil
+ })
+
+ group.Go(func() (returnedErr error) {
+ defer func() { objectsReader.CloseWithError(returnedErr) }()
+
+ destinationFile, err := os.OpenFile(
+ transaction.packFilePath(),
+ os.O_WRONLY|os.O_CREATE|os.O_EXCL,
+ perm.PrivateFile,
+ )
+ if err != nil {
+ return fmt.Errorf("open file: %w", err)
+ }
+ defer destinationFile.Close()
+
+ if err := quarantinedRepo.PackObjects(ctx, objectsReader, destinationFile); err != nil {
+ return fmt.Errorf("pack objects: %w", err)
+ }
+
+ // Sync the contents of the pack so they are flushed to disk prior to the transaction
+ // being admitted for processing.
+ if err := destinationFile.Sync(); err != nil {
+ return fmt.Errorf("sync pack: %w", err)
+ }
+
+ return destinationFile.Close()
+ })
+
+ return group.Wait()
}
// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller.
@@ -418,10 +591,23 @@ func (mgr *TransactionManager) Run() (returnedErr error) {
// processTransaction waits for a transaction and processes it by verifying and
// logging it.
-func (mgr *TransactionManager) processTransaction() error {
+func (mgr *TransactionManager) processTransaction() (returnedErr error) {
+ var cleanUps []func() error
+ defer func() {
+ for _, cleanUp := range cleanUps {
+ if err := cleanUp(); err != nil && returnedErr == nil {
+ returnedErr = fmt.Errorf("clean up: %w", err)
+ }
+ }
+ }()
+
var transaction *Transaction
select {
case transaction = <-mgr.admissionQueue:
+ // The Transaction does not clean up itself anymore once it has been admitted for
+ // processing. This avoids the Transaction concurrently removing the staged state
+ // while the manager is still operating on it. We thus need to defer its clean up.
+ cleanUps = append(cleanUps, transaction.clean)
case <-mgr.ctx.Done():
}
@@ -431,7 +617,7 @@ func (mgr *TransactionManager) processTransaction() error {
return err
}
- transaction.result <- func() error {
+ transaction.result <- func() (commitErr error) {
logEntry, err := mgr.verifyReferences(mgr.ctx, transaction)
if err != nil {
return fmt.Errorf("verify references: %w", err)
@@ -442,8 +628,31 @@ func (mgr *TransactionManager) processTransaction() error {
CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR,
}
}
+ nextLogIndex := mgr.appendedLogIndex + 1
+
+ if transaction.includesPack {
+ logEntry.IncludesPack = true
+
+ removePack, err := mgr.storePackFile(mgr.ctx, nextLogIndex, transaction)
+ cleanUps = append(cleanUps, func() error {
+ // The transaction's pack file might have been moved in to place at <repo>/wal/packs/<log_index>.pack.
+ // If anything fails before the transaction is committed, the pack file must be removed as otherwise
+ // it would occupy the pack file slot of the next log entry. If this can't be done, the TransactionManager
+ // will exit with an error. The pack file will be cleaned up on restart and no further processing is
+ // allowed until that happens.
+ if commitErr != nil {
+ return removePack()
+ }
- return mgr.appendLogEntry(logEntry)
+ return nil
+ })
+
+ if err != nil {
+ return fmt.Errorf("store pack file: %w", err)
+ }
+ }
+
+ return mgr.appendLogEntry(nextLogIndex, logEntry)
}()
return nil
@@ -506,6 +715,10 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
mgr.applyNotifications[i] = make(chan struct{})
}
+ if err := mgr.removeStalePackFiles(mgr.ctx, mgr.appendedLogIndex); err != nil {
+ return fmt.Errorf("remove stale packs: %w", err)
+ }
+
return nil
}
@@ -565,6 +778,7 @@ func (mgr *TransactionManager) createDirectories() error {
for _, relativePath := range []string{
"wal/hooks",
+ "wal/packs",
} {
directory := filepath.Join(repoPath, relativePath)
if _, err := os.Stat(directory); err != nil {
@@ -585,6 +799,83 @@ func (mgr *TransactionManager) createDirectories() error {
return nil
}
+// removeStalePackFiles removes pack files from the log directory that have no associated log entry.
+// Such packs can be left around if a transaction's pack file was moved in place successfully
+// but the manager was interrupted before successfully persisting the log entry itself.
+func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error {
+ // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale
+ // pack would be for the next log index. Remove the pack if it exists.
+ possibleStalePackPath, err := mgr.packFilePath(appendedIndex + 1)
+ if err != nil {
+ return fmt.Errorf("pack file path: %w", err)
+ }
+
+ if err := os.Remove(possibleStalePackPath); err != nil {
+ if !errors.Is(err, fs.ErrNotExist) {
+ return fmt.Errorf("remove: %w", err)
+ }
+
+ return nil
+ }
+
+ // Sync the parent directory to flush the file deletion.
+ if err := safe.NewSyncer().Sync(filepath.Dir(possibleStalePackPath)); err != nil {
+ return fmt.Errorf("sync: %w", err)
+ }
+
+ return nil
+}
+
+// storePackFile moves the transaction's pack file from the object directory to its destination in the log.
+// It returns a function, even on errors, that must be called to clean up the pack file committing the log entry
+// fails.
+func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) {
+ removePack := func() error { return nil }
+
+ destinationPath, err := mgr.packFilePath(index)
+ if err != nil {
+ return removePack, fmt.Errorf("pack file path: %w", err)
+ }
+
+ if err := os.Rename(
+ transaction.packFilePath(),
+ destinationPath,
+ ); err != nil {
+ return removePack, fmt.Errorf("move pack file: %w", err)
+ }
+
+ removePack = func() error {
+ if err := os.Remove(destinationPath); err != nil {
+ return fmt.Errorf("remove pack file: %w", err)
+ }
+
+ return nil
+ }
+
+ // Sync the parent directory. The pack's contents are synced when the pack file is computed.
+ if err := safe.NewSyncer().Sync(filepath.Dir(destinationPath)); err != nil {
+ return removePack, fmt.Errorf("sync: %w", err)
+ }
+
+ return removePack, nil
+}
+
+// packFilePath returns the path where a given log entry's pack file would be stored.
+func (mgr *TransactionManager) packFilePath(index LogIndex) (string, error) {
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return "", fmt.Errorf("repo path: %w", err)
+ }
+
+ return packFilePathForLogIndex(repoPath, index), nil
+}
+
+// packFilePathForLogIndex returns a log entry's pack file's absolute path in a given
+// a repository path.
+func packFilePathForLogIndex(repoPath string, index LogIndex) string {
+ return filepath.Join(repoPath, "wal", "packs", index.String()+".pack")
+}
+
// verifyReferences verifies that the references in the transaction apply on top of the already accepted
// reference changes. The old tips in the transaction are verified against the current actual tips.
// It returns the write-ahead log entry for the transaction if it was successfully verified.
@@ -644,7 +935,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
) == -1
})
- if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates); err != nil {
+ if err := mgr.verifyReferencesWithGit(ctx, logEntry.ReferenceUpdates, transaction.quarantineDirectory); err != nil {
return nil, fmt.Errorf("verify references with git: %w", err)
}
@@ -665,8 +956,8 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction
// the updates will go through when they are being applied in the log. This also catches any invalid reference names
// and file/directory conflicts with Git's loose reference storage which can occur with references like
// 'refs/heads/parent' and 'refs/heads/parent/child'.
-func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) error {
- updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates)
+func (mgr *TransactionManager) verifyReferencesWithGit(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, quarantineDirectory string) error {
+ updater, err := mgr.prepareReferenceTransaction(ctx, referenceUpdates, quarantineDirectory)
if err != nil {
return fmt.Errorf("prepare reference transaction: %w", err)
}
@@ -716,8 +1007,17 @@ func (mgr *TransactionManager) updateDefaultBranch(ctx context.Context, defaultB
// prepareReferenceTransaction prepares a reference transaction with `git update-ref`. It leaves committing
// or aborting up to the caller. Either should be called to clean up the process. The process is cleaned up
// if an error is returned.
-func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate) (*updateref.Updater, error) {
- updater, err := updateref.New(ctx, mgr.repository, updateref.WithDisabledTransactions())
+func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context, referenceUpdates []*gitalypb.LogEntry_ReferenceUpdate, quarantineDirectory string) (*updateref.Updater, error) {
+ repository := mgr.repository
+ if quarantineDirectory != "" {
+ var err error
+ repository, err = mgr.repository.Quarantine(quarantineDirectory)
+ if err != nil {
+ return nil, fmt.Errorf("quarantine: %w", err)
+ }
+ }
+
+ updater, err := updateref.New(ctx, repository, updateref.WithDisabledTransactions())
if err != nil {
return nil, fmt.Errorf("new: %w", err)
}
@@ -741,9 +1041,7 @@ func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context,
// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not
// logged nor applied later.
-func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error {
- nextLogIndex := mgr.appendedLogIndex + 1
-
+func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *gitalypb.LogEntry) error {
if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil {
return fmt.Errorf("set log entry: %w", err)
}
@@ -766,7 +1064,13 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return fmt.Errorf("read log entry: %w", err)
}
- updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates)
+ if logEntry.IncludesPack {
+ if err := mgr.applyPackFile(ctx, logIndex); err != nil {
+ return fmt.Errorf("apply pack file: %w", err)
+ }
+ }
+
+ updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, "")
if err != nil {
return fmt.Errorf("perpare reference transaction: %w", err)
}
@@ -807,6 +1111,23 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return nil
}
+// applyPackFile unpacks the objects from the pack file into the repository if the log entry
+// has an associated pack file.
+func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error {
+ packFilePath, err := mgr.packFilePath(logIndex)
+ if err != nil {
+ return fmt.Errorf("pack file path: %w", err)
+ }
+
+ packFile, err := os.Open(packFilePath)
+ if err != nil {
+ return fmt.Errorf("open pack file: %w", err)
+ }
+ defer packFile.Close()
+
+ return mgr.repository.UnpackObjects(ctx, packFile)
+}
+
// applyCustomHooks applies the custom hooks to the repository from the log entry. The hooks are stored
// at `<repo>/wal/hooks/<log_index>`. The hooks are fsynced prior to returning so it is safe to delete
// the log entry afterwards.
diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go
index 616a013b8..96648cc0d 100644
--- a/internal/gitaly/transaction_manager_test.go
+++ b/internal/gitaly/transaction_manager_test.go
@@ -8,6 +8,10 @@ import (
"errors"
"fmt"
"io/fs"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
"sync"
"testing"
"time"
@@ -63,6 +67,49 @@ func validCustomHooks(tb testing.TB) []byte {
// call back to PartitionManager.
func noopTransactionFinalizer() {}
+// writePack writes a pack file and its index into the destination.
+func writePack(tb testing.TB, cfg config.Cfg, packFile []byte, destinationPack string) {
+ tb.Helper()
+
+ require.NoError(tb, os.WriteFile(destinationPack, packFile, fs.ModePerm))
+ gittest.ExecOpts(tb, cfg,
+ gittest.ExecConfig{Stdin: bytes.NewReader(packFile)},
+ "index-pack", "--object-format="+gittest.DefaultObjectHash.Format, destinationPack,
+ )
+}
+
+// packFileDirectoryEntry returns a DirectoryEntry that parses content as a pack file and asserts that the
+// set of objects in the pack file matches the expected objects.
+func packFileDirectoryEntry(cfg config.Cfg, mode fs.FileMode, expectedObjects []git.ObjectID) testhelper.DirectoryEntry {
+ sortObjects := func(objects []git.ObjectID) {
+ sort.Slice(objects, func(i, j int) bool {
+ return objects[i] < objects[j]
+ })
+ }
+
+ sortObjects(expectedObjects)
+
+ return testhelper.DirectoryEntry{
+ Mode: mode,
+ Content: expectedObjects,
+ ParseContent: func(tb testing.TB, content []byte) any {
+ tb.Helper()
+
+ tempDir := tb.TempDir()
+ // Initialize a temporary repository where to write the pack. The cat file invocation for listing
+ // the objects needs to run within a repository, and it would otherwise use the developer's repository.
+ // If the object format doesn't match with the pack files in the test, things fail.
+ gittest.Exec(tb, cfg, "init", "--object-format="+gittest.DefaultObjectHash.Format, "--bare", tempDir)
+ writePack(tb, cfg, content, filepath.Join(tempDir, "objects", "pack", "content.pack"))
+
+ actualObjects := gittest.ListObjects(tb, cfg, tempDir)
+ sortObjects(actualObjects)
+
+ return actualObjects
+ },
+ }
+}
+
func TestTransactionManager(t *testing.T) {
umask := perm.GetUmask()
@@ -71,7 +118,8 @@ func TestTransactionManager(t *testing.T) {
ctx := testhelper.Context(t)
type testCommit struct {
- OID git.ObjectID
+ OID git.ObjectID
+ Pack []byte
}
type testCommits struct {
@@ -125,15 +173,35 @@ func TestTransactionManager(t *testing.T) {
nonExistentOID, err := objectHash.FromHex(hex.EncodeToString(hasher.Sum(nil)))
require.NoError(t, err)
+ packCommit := func(oid git.ObjectID) []byte {
+ t.Helper()
+
+ var pack bytes.Buffer
+ require.NoError(t,
+ localRepo.PackObjects(ctx, strings.NewReader(oid.String()), &pack),
+ )
+
+ return pack.Bytes()
+ }
+
return testSetup{
Config: cfg,
ObjectHash: objectHash,
Repository: localRepo,
NonExistentOID: nonExistentOID,
Commits: testCommits{
- First: testCommit{OID: firstCommitOID},
- Second: testCommit{OID: secondCommitOID},
- Third: testCommit{OID: thirdCommitOID},
+ First: testCommit{
+ OID: firstCommitOID,
+ Pack: packCommit(firstCommitOID),
+ },
+ Second: testCommit{
+ OID: secondCommitOID,
+ Pack: packCommit(secondCommitOID),
+ },
+ Third: testCommit{
+ OID: thirdCommitOID,
+ Pack: packCommit(thirdCommitOID),
+ },
},
}
}
@@ -170,6 +238,10 @@ func TestTransactionManager(t *testing.T) {
// ExpectedError is the expected error to be raised from the manager's Run. Panics are converted
// to errors and asserted to match this as well.
ExpectedError error
+ // ModifyRepository allows for running modifying the repository prior the manager starting. This
+ // may be necessary to test some states that can be reached from hard crashes but not during the
+ // tests.
+ ModifyRepository func(tb testing.TB, repoPath string)
}
// StopManager stops a TransactionManager.
@@ -209,6 +281,8 @@ func TestTransactionManager(t *testing.T) {
SkipVerificationFailures bool
// ReferenceUpdates are the reference updates to commit.
ReferenceUpdates ReferenceUpdates
+ // QuarantinedPacks are the packs to include in the quarantine directory of the transaction.
+ QuarantinedPacks [][]byte
// DefaultBranchUpdate is the default branch update to commit.
DefaultBranchUpdate *DefaultBranchUpdate
// CustomHooksUpdate is the custom hooks update to commit.
@@ -221,6 +295,12 @@ func TestTransactionManager(t *testing.T) {
TransactionID int
}
+ // Prune prunes all unreferenced objects from the repository.
+ type Prune struct {
+ // ExpectedObjects are the object expected to exist in the repository after pruning.
+ ExpectedObjects []git.ObjectID
+ }
+
// StateAssertions models an assertion of the entire state managed by the TransactionManager.
type StateAssertion struct {
// DefaultBranch is the expected refname that HEAD points to.
@@ -959,6 +1039,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -1002,6 +1083,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -1080,6 +1162,7 @@ func TestTransactionManager(t *testing.T) {
"/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
"/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
"/wal/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
},
},
},
@@ -1761,6 +1844,7 @@ func TestTransactionManager(t *testing.T) {
},
Directory: testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks/1/pre-receive": {
@@ -1773,6 +1857,470 @@ func TestTransactionManager(t *testing.T) {
},
},
},
+ {
+ desc: "pack file includes referenced commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ ExpectedError: updateref.NonExistentObjectError{
+ ReferenceName: "refs/heads/main",
+ ObjectID: setup.Commits.First.OID.String(),
+ },
+ },
+ Begin{
+ TransactionID: 2,
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ },
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ },
+ },
+ {
+ desc: "pack file includes unreachable objects depended upon",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ QuarantinedPacks: [][]byte{
+ setup.Commits.First.Pack,
+ setup.Commits.Second.Pack,
+ },
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ // Point main to the first commit so the second one is unreachable.
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.Second.OID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ AssertManager{},
+ StopManager{},
+ StartManager{
+ // Crash the manager before the third transaction is applied. This allows us to
+ // prune before it is applied to ensure the pack file contains all necessary commits.
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookContext) {
+ panic(errSimulatedCrash)
+ },
+ },
+ ExpectedError: errSimulatedCrash,
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 2,
+ },
+ },
+ Commit{
+ TransactionID: 3,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.Third.Pack},
+ },
+ AssertManager{
+ ExpectedError: errSimulatedCrash,
+ },
+ // Prune so the unreachable commits have been removed prior to the third log entry being
+ // applied.
+ Prune{
+ ExpectedObjects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ },
+ StartManager{},
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.Third.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(3).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ setup.Commits.Second.OID,
+ },
+ ),
+ "/wal/packs/3.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.Commits.Second.OID,
+ setup.Commits.Third.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ setup.Commits.Second.OID,
+ setup.Commits.Third.OID,
+ },
+ },
+ },
+ {
+ desc: "pack file reapplying works",
+ steps: steps{
+ Prune{},
+ StartManager{
+ Hooks: testHooks{
+ BeforeStoreAppliedLogIndex: func(hookContext) {
+ panic(errSimulatedCrash)
+ },
+ },
+ ExpectedError: errSimulatedCrash,
+ },
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ },
+ AssertManager{
+ ExpectedError: errSimulatedCrash,
+ },
+ StartManager{},
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ },
+ },
+ {
+ desc: "pack file missing referenced commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ ExpectedError: localrepo.BadObjectError{ObjectID: setup.Commits.Second.OID},
+ },
+ },
+ expectedState: StateAssertion{
+ Objects: []git.ObjectID{},
+ },
+ },
+ {
+ desc: "pack file missing intermediate commit",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.Third.Pack},
+ ExpectedError: localrepo.ObjectReadError{ObjectID: setup.Commits.Second.OID},
+ },
+ },
+ expectedState: StateAssertion{
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()},
+ },
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ },
+ },
+ {
+ desc: "pack file only",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(1).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ },
+ Objects: []git.ObjectID{},
+ },
+ },
+ {
+ desc: "pack file with deletions",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.Second.Pack},
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ },
+ },
+ {
+ desc: "pack file applies with dependency concurrently deleted",
+ steps: steps{
+ Prune{},
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.First.Pack},
+ },
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.ObjectHash.ZeroOID},
+ },
+ },
+ AssertManager{},
+ Prune{},
+ Commit{
+ TransactionID: 3,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/dependant": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID},
+ },
+ QuarantinedPacks: [][]byte{setup.Commits.Second.Pack},
+ // The transaction fails to apply as we are not yet maintaining internal references
+ // to the old tips of concurrently deleted references. This causes the prune step to
+ // remove the object this the pack file depends on.
+ //
+ // For now, keep the test case to assert the behavior. We'll fix this in a later MR.
+ ExpectedError: localrepo.ObjectReadError{
+ ObjectID: setup.Commits.First.OID,
+ },
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(setup.Repository))): LogIndex(2).toProto(),
+ },
+ Directory: testhelper.DirectoryState{
+ "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs/1.pack": packFileDirectoryEntry(
+ setup.Config,
+ umask.Mask(perm.PrivateFile),
+ []git.ObjectID{
+ setup.ObjectHash.EmptyTreeOID,
+ setup.Commits.First.OID,
+ },
+ ),
+ },
+ Objects: []git.ObjectID{},
+ },
+ },
+ {
+ desc: "pack files without log entries are cleaned up after a crash",
+ steps: steps{
+ StartManager{
+ // The manager cleans up pack files if a committing fails. Since we can't
+ // hard kill the manager and it will still run the deferred clean up functions,
+ // we have to test the behavior by manually creating a stale pack here.
+ //
+ // The Manager starts up and we expect the pack file to be gone at the end of the test.
+ ModifyRepository: func(tb testing.TB, repoPath string) {
+ packFilePath := packFilePathForLogIndex(repoPath, 1)
+ require.NoError(t, os.MkdirAll(filepath.Dir(packFilePath), fs.ModePerm))
+ require.NoError(t, os.WriteFile(
+ packFilePath,
+ []byte("invalid pack"),
+ fs.ModePerm,
+ ))
+ },
+ },
+ },
+ },
}
type invalidReferenceTestCase struct {
@@ -1906,11 +2454,13 @@ func TestTransactionManager(t *testing.T) {
require.NoError(t, err)
defer testhelper.MustClose(t, database)
+ stagingDir := t.TempDir()
+
var (
// managerRunning tracks whether the manager is running or stopped.
managerRunning bool
// transactionManager is the current TransactionManager instance.
- transactionManager = NewTransactionManager(database, setup.Repository, noopTransactionFinalizer)
+ transactionManager = NewTransactionManager(database, setup.Repository, stagingDir, noopTransactionFinalizer)
// managerErr is used for synchronizing manager stopping and returning
// the error from Run.
managerErr chan error
@@ -1943,10 +2493,15 @@ func TestTransactionManager(t *testing.T) {
switch step := step.(type) {
case StartManager:
require.False(t, managerRunning, "test error: manager started while it was already running")
+
+ if step.ModifyRepository != nil {
+ step.ModifyRepository(t, repoPath)
+ }
+
managerRunning = true
managerErr = make(chan error)
- transactionManager = NewTransactionManager(database, setup.Repository, noopTransactionFinalizer)
+ transactionManager = NewTransactionManager(database, setup.Repository, stagingDir, noopTransactionFinalizer)
installHooks(t, transactionManager, database, setup.Repository, hooks{
beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry,
beforeResolveRevision: step.Hooks.BeforeAppendLogEntry,
@@ -2014,6 +2569,28 @@ func TestTransactionManager(t *testing.T) {
transaction.SetCustomHooks(step.CustomHooksUpdate.CustomHooksTAR)
}
+ if step.QuarantinedPacks != nil {
+ quarantineDirectory, err := transaction.QuarantineDirectory()
+ require.NoError(t, err)
+
+ for _, dir := range []string{
+ transaction.stagingDirectory,
+ quarantineDirectory,
+ } {
+ const expectedPerm = perm.PrivateDir
+ stat, err := os.Stat(dir)
+ require.NoError(t, err)
+ require.Equal(t, stat.Mode().Perm(), umask.Mask(expectedPerm),
+ "%q had %q permission but expected %q", dir, stat.Mode().Perm().String(), expectedPerm,
+ )
+ }
+
+ for i, pack := range step.QuarantinedPacks {
+ writePack(t, setup.Config, pack, filepath.Join(quarantineDirectory, "pack", fmt.Sprintf("%d.pack", i)))
+ }
+
+ }
+
commitCtx := ctx
if step.Context != nil {
commitCtx = step.Context
@@ -2023,6 +2600,9 @@ func TestTransactionManager(t *testing.T) {
case Rollback:
require.Contains(t, openTransactions, step.TransactionID, "test error: transaction rollbacked before beginning it")
require.NoError(t, openTransactions[step.TransactionID].Rollback())
+ case Prune:
+ gittest.Exec(t, setup.Config, "-C", repoPath, "prune")
+ require.ElementsMatch(t, step.ExpectedObjects, gittest.ListObjects(t, setup.Config, repoPath))
default:
t.Fatalf("unhandled step type: %T", step)
}
@@ -2043,6 +2623,7 @@ func TestTransactionManager(t *testing.T) {
// gets asserted.
expectedDirectory = testhelper.DirectoryState{
"/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
"/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
}
}
@@ -2060,6 +2641,10 @@ func TestTransactionManager(t *testing.T) {
}
require.ElementsMatch(t, expectedObjects, gittest.ListObjects(t, setup.Config, repoPath))
+
+ entries, err := os.ReadDir(stagingDir)
+ require.NoError(t, err)
+ require.Empty(t, entries, "staging directory was not cleaned up")
})
}
}
@@ -2070,6 +2655,7 @@ func checkManagerError(t *testing.T, managerErrChannel chan error, mgr *Transact
testTransaction := &Transaction{
referenceUpdates: ReferenceUpdates{"sentinel": {}},
result: make(chan error, 1),
+ finalize: func() {},
}
var (
@@ -2212,7 +2798,7 @@ func BenchmarkTransactionManager(b *testing.B) {
commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents())
commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1))
- manager := NewTransactionManager(database, localRepo, noopTransactionFinalizer)
+ manager := NewTransactionManager(database, localRepo, b.TempDir(), noopTransactionFinalizer)
managers = append(managers, manager)
managerWG.Add(1)
diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go
index d9a673cdc..d5d70c72d 100644
--- a/proto/go/gitalypb/log.pb.go
+++ b/proto/go/gitalypb/log.pb.go
@@ -38,6 +38,9 @@ type LogEntry struct {
DefaultBranchUpdate *LogEntry_DefaultBranchUpdate `protobuf:"bytes,2,opt,name=default_branch_update,json=defaultBranchUpdate,proto3" json:"default_branch_update,omitempty"`
// CustomHooksUpdate contains the custom hooks to set in the repository.
CustomHooksUpdate *LogEntry_CustomHooksUpdate `protobuf:"bytes,3,opt,name=custom_hooks_update,json=customHooksUpdate,proto3" json:"custom_hooks_update,omitempty"`
+ // includes_pack denotes whether this log entry has a pack file associated
+ // with it. The pack files are stored separately on the filesystem.
+ IncludesPack bool `protobuf:"varint,4,opt,name=includes_pack,json=includesPack,proto3" json:"includes_pack,omitempty"`
}
func (x *LogEntry) Reset() {
@@ -93,6 +96,13 @@ func (x *LogEntry) GetCustomHooksUpdate() *LogEntry_CustomHooksUpdate {
return nil
}
+func (x *LogEntry) GetIncludesPack() bool {
+ if x != nil {
+ return x.IncludesPack
+ }
+ return false
+}
+
// LogIndex serializes a log index. It's used for storing a repository's
// applied log index in the database.
//
@@ -312,7 +322,7 @@ var File_log_proto protoreflect.FileDescriptor
var file_log_proto_rawDesc = []byte{
0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74,
- 0x61, 0x6c, 0x79, 0x22, 0xd7, 0x03, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79,
+ 0x61, 0x6c, 0x79, 0x22, 0xfc, 0x03, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79,
0x12, 0x4d, 0x0a, 0x11, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x70,
0x64, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69,
0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65,
@@ -328,27 +338,29 @@ var file_log_proto_rawDesc = []byte{
0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e,
0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48,
0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x11, 0x63, 0x75, 0x73, 0x74,
- 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x1a, 0x51, 0x0a,
- 0x0f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65,
- 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61,
- 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65,
- 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f,
- 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64,
- 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63,
- 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72,
- 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52,
- 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x1a, 0x3d,
- 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64,
- 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f,
- 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63,
- 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, 0x72, 0x22, 0x27, 0x0a,
- 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67,
- 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, 0x6f,
- 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62,
- 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f,
- 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x35, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
- 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72,
- 0x6f, 0x74, 0x6f, 0x33,
+ 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a,
+ 0x0d, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x04,
+ 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x50, 0x61,
+ 0x63, 0x6b, 0x1a, 0x51, 0x0a, 0x0f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55,
+ 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e,
+ 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72,
+ 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07,
+ 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e,
+ 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74,
+ 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e,
+ 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e,
+ 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f,
+ 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74,
+ 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54,
+ 0x61, 0x72, 0x22, 0x27, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b,
+ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x04, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67,
+ 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62,
+ 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x35, 0x2f,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70,
+ 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/proto/log.proto b/proto/log.proto
index 50c8ccdc5..52ccbca61 100644
--- a/proto/log.proto
+++ b/proto/log.proto
@@ -44,6 +44,9 @@ message LogEntry {
DefaultBranchUpdate default_branch_update = 2;
// CustomHooksUpdate contains the custom hooks to set in the repository.
CustomHooksUpdate custom_hooks_update = 3;
+ // includes_pack denotes whether this log entry has a pack file associated
+ // with it. The pack files are stored separately on the filesystem.
+ bool includes_pack = 4;
}
// LogIndex serializes a log index. It's used for storing a repository's