Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/gitaly/storage/storagemgr/transaction_manager.go')
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go604
1 files changed, 543 insertions, 61 deletions
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 6cf98ffbe..1bb77f115 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -1,6 +1,7 @@
package storagemgr
import (
+ "bufio"
"bytes"
"container/list"
"context"
@@ -19,6 +20,7 @@ import (
"github.com/dgraph-io/badger/v4"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
@@ -68,6 +70,9 @@ var (
errHousekeepingConflictOtherUpdates = errors.New("housekeeping in the same transaction with other updates")
// errHousekeepingConflictConcurrent is returned when there are another concurrent housekeeping task.
errHousekeepingConflictConcurrent = errors.New("conflict with another concurrent housekeeping task")
+ // errRepackConflictPrunedObject is returned when the repacking task pruned an object that is still used by other
+ // concurrent transactions.
+ errRepackConflictPrunedObject = errors.New("pruned object used by other updates")
// Below errors are used to error out in cases when updates have been staged in a read-only transaction.
errReadOnlyReferenceUpdates = errors.New("reference updates staged in a read-only transaction")
@@ -150,6 +155,7 @@ type repositoryCreation struct {
// such as the cleanup of unneeded files and optimizations for the repository's data structures.
type runHousekeeping struct {
packRefs *runPackRefs
+ repack *runRepack
}
// runPackRefs models refs packing housekeeping task. It packs heads and tags for efficient repository access.
@@ -159,6 +165,24 @@ type runPackRefs struct {
PrunedRefs map[git.ReferenceName]struct{}
}
+// runRepack models repack housekeeping task. We support multiple repacking strategies. At this stage, the outside
+// scheduler determines which strategy to use. The transaction manager is responsible for executing it. In the future,
+// we want to make housekeeping smarter by migrating housekeeping scheduling responsibility to this manager. That work
+// is tracked in https://gitlab.com/gitlab-org/gitaly/-/issues/5709.
+type runRepack struct {
+ // config tells which strategy and baggaged options.
+ config housekeeping.RepackObjectsConfig
+ isFullRepack bool
+ // newFiles contains the list of new packfiles to be applied into the destination repository.
+ newFiles []string
+ // deletedFiles contains the list of packfiles should be removed in the destination repository. The repacking
+ // command runs in the snapshot repository for a significant amount of time. During so, other requests
+ // containing new objects can land and be applied before housekeeping task finishes. So, we keep a snapshot of
+ // packfile structure before and after running the task. When the manager applies the task, it targets those
+ // known files only. Other packfiles can still co-exist along side with the resulting repacked packfile(s).
+ deletedFiles []string
+}
+
// ReferenceUpdates contains references to update. Reference name is used as the key and the value
// is the expected old tip and the desired new tip.
type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate
@@ -624,6 +648,16 @@ func (txn *Transaction) PackRefs() {
}
}
+// Repack sets repacking housekeeping task as a part of the transaction.
+func (txn *Transaction) Repack(config housekeeping.RepackObjectsConfig) {
+ if txn.runHousekeeping == nil {
+ txn.runHousekeeping = &runHousekeeping{}
+ }
+ txn.runHousekeeping.repack = &runRepack{
+ config: config,
+ }
+}
+
// IncludeObject includes the given object and its dependencies in the transaction's logged pack file even
// if the object is unreachable from the references.
func (txn *Transaction) IncludeObject(oid git.ObjectID) {
@@ -1148,6 +1182,9 @@ func (mgr *TransactionManager) prepareHousekeeping(ctx context.Context, transact
if err := mgr.preparePackRefs(ctx, transaction); err != nil {
return err
}
+ if err := mgr.prepareRepacking(ctx, transaction); err != nil {
+ return err
+ }
return nil
}
@@ -1198,7 +1235,7 @@ func (mgr *TransactionManager) preparePackRefs(ctx context.Context, transaction
return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String())
}
- if err := os.Chmod(filepath.Join(filepath.Join(repoPath, "packed-refs")), perm.SharedReadOnlyFile); err != nil {
+ if err := os.Chmod(filepath.Join(repoPath, "packed-refs"), perm.SharedReadOnlyFile); err != nil {
return fmt.Errorf("modifying permission of pack-refs file")
}
@@ -1228,6 +1265,205 @@ func (mgr *TransactionManager) preparePackRefs(ctx context.Context, transaction
return nil
}
+// prepareRepacking runs git-repack(1) command against the snapshot repository using desired repacking strategy. Each
+// strategy has a different cost and effect corresponding to scheduling frequency.
+// - IncrementalWithUnreachable: pack all loose objects into one packfile. This strategy is a no-op because all new
+// objects regardless of their reachablity status are packed by default by the manager.
+// - Geometric: merge all packs together with geometric repacking. This is expensive or cheap depending on which packs
+// get merged. No need for a connectivity check.
+// - FullWithUnreachable: merge all packs into one but keep unreachable objects. This is more expensive but we don't
+// take connectivity into account. This strategy is essential for object pool. As we cannot prune objects in a pool,
+// packing them into one single packfile boosts its performance.
+// - FullWithCruft: Merge all packs into one and prune unreachable objects. It is the most effective, but yet costly
+// strategy. We cannot run this type of task frequently on a large repository. This stategy is handled as a full
+// repacking without cruft because we don't need object expiry.
+// Before the command runs, we capture a snapshot of existing packfiles. After the command finishes, we re-capture the
+// list and extract the list of to-be-updated packfiles. This practice is to prevent repacking task from deleting
+// packfiles of other concurrent updates at the applying phase.
+func (mgr *TransactionManager) prepareRepacking(ctx context.Context, transaction *Transaction) error {
+ if transaction.runHousekeeping.repack == nil {
+ return nil
+ }
+
+ var err error
+ repack := transaction.runHousekeeping.repack
+
+ // Build a working repository pointing to snapshot repository. Housekeeping task can access the repository
+ // without the needs for quarantine.
+ workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.relativePath(transaction.relativePath))
+ repoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath())
+
+ if repack.isFullRepack, err = housekeeping.ValidateRepacking(repack.config); err != nil {
+ return fmt.Errorf("validating repacking: %w", err)
+ }
+
+ if repack.config.Strategy == housekeeping.RepackObjectsStrategyIncrementalWithUnreachable {
+ // Once the transaction manager has been applied and at least one complete repack has occurred, there
+ // should be no loose unreachable objects remaining in the repository. When the transaction manager
+ // processes a change, it consolidates all unreachable objects and objects about to become reachable
+ // into a new packfile, which is then placed in the repository. As a result, unreachable objects may
+ // still exist but are confined to packfiles. These will eventually be cleaned up during a full repack.
+ // In the interim, geometric repacking is utilized to optimize the structure of packfiles for faster
+ // access. Therefore, this operation is effectively a no-op. However, we maintain it for the sake of
+ // backward compatibility with the existing housekeeping scheduler.
+ return nil
+ }
+
+ // Capture the list of packfiles and their baggages before repacking.
+ beforeFiles, err := mgr.collectPackfiles(ctx, repoPath)
+ if err != nil {
+ return fmt.Errorf("collecting existing packfiles: %w", err)
+ }
+
+ switch repack.config.Strategy {
+ case housekeeping.RepackObjectsStrategyGeometric:
+ // Geometric repacking rearranges the list of packfiles according to a geometric progression. This process
+ // does not consider object reachability. Since all unreachable objects remain within small packfiles,
+ // they become included in the newly created packfiles. Geometric repacking does not prune any objects.
+ if err := housekeeping.PerformGeometricRepacking(ctx, workingRepository, repack.config); err != nil {
+ return fmt.Errorf("perform geometric repacking: %w", err)
+ }
+ case housekeeping.RepackObjectsStrategyFullWithUnreachable:
+ // This strategy merges all packfiles into a single packfile, simultaneously removing any loose objects
+ // if present. Unreachable objects are then appended to the end of this unified packfile. Although the
+ // `git-repack(1)` command does not offer an option to specifically pack loose unreachable objects, this
+ // is not an issue because the transaction manager already ensures that unreachable objects are
+ // contained within packfiles. Therefore, this strategy effectively consolidates all packfiles into a
+ // single one. Adopting this strategy is crucial for alternates, as it ensures that we can manage
+ // objects within an object pool without the capability to prune them.
+ if err := housekeeping.PerformFullRepackingWithUnreachable(ctx, workingRepository, repack.config); err != nil {
+ return err
+ }
+ case housekeeping.RepackObjectsStrategyFullWithCruft:
+ // Both of above strategies don't prune unreachable objects. They re-organize the objects between
+ // packfiles. In the traditional housekeeping, the manager gets rid of unreachable objects via full
+ // repacking with cruft. It pushes all unreachable objects to a cruft packfile and keeps track of each
+ // object mtimes. All unreachable objects exceeding a grace period are cleaned up. The grace period is
+ // to ensure the housekeeping doesn't delete a to-be-reachable object accidentally, for example when GC
+ // runs while a concurrent push is being processed.
+ // The transaction manager handles concurrent requests very differently from the original git way. Each
+ // request runs on a snapshot repository and the results are collected in the form of packfiles. Those
+ // packfiles contain resulting reachable and unreachable objects. As a result, we don't need to take
+ // object expiry nor curft pack into account. This operation triggers a normal full repack without
+ // cruft packing.
+ // Afterward, packed unreachable objects are removed. During migration to transaction system, there
+ // might be some loose unreachable objects. They will eventually be packed via either of the above tasks.
+ if err := housekeeping.PerformRepack(ctx, workingRepository, repack.config,
+ // Do a full repack.
+ git.Flag{Name: "-a"},
+ // Don't include objects part of alternate.
+ git.Flag{Name: "-l"},
+ // Delete loose objects made redundant by this repack. This option is essential during migration
+ // to the new transaction system. Afterward, it can be removed.
+ git.Flag{Name: "-d"},
+ ); err != nil {
+ return err
+ }
+
+ if err := housekeeping.WriteCommitGraph(ctx, workingRepository, housekeeping.WriteCommitGraphConfig{ReplaceChain: true}); err != nil {
+ return fmt.Errorf("re-writing commit graph: %w", err)
+ }
+ }
+
+ // Re-capture the list of packfiles and their baggages after repacking.
+ afterFiles, err := mgr.collectPackfiles(ctx, repoPath)
+ if err != nil {
+ return fmt.Errorf("collecting new packfiles: %w", err)
+ }
+
+ for file := range beforeFiles {
+ // We delete the files only if it's missing from the before set.
+ if _, exist := afterFiles[file]; !exist {
+ repack.deletedFiles = append(repack.deletedFiles, file)
+ }
+ }
+
+ for file := range afterFiles {
+ // Similarly, we don't need to link existing packfiles.
+ if _, exist := beforeFiles[file]; !exist {
+ repack.newFiles = append(repack.newFiles, file)
+ }
+ }
+
+ // Copy new files to lsn directory.
+ for _, file := range repack.newFiles {
+ if err := os.Link(
+ filepath.Join(filepath.Join(filepath.Join(repoPath, "objects", "pack"), file)),
+ filepath.Join(transaction.walFilesPath(), file),
+ ); err != nil {
+ return fmt.Errorf("copying packfiles to WAL directory: %w", err)
+ }
+ }
+
+ // Copy the whole commit graphs over.
+ if repack.config.Strategy == housekeeping.RepackObjectsStrategyFullWithCruft && (len(repack.newFiles) > 0 || len(repack.deletedFiles) > 0) {
+ commitGraphsDir := filepath.Join(repoPath, "objects", "info", "commit-graphs")
+ if graphEntries, err := os.ReadDir(commitGraphsDir); err != nil {
+ if !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+ } else if len(graphEntries) > 0 {
+ walGraphsDir := filepath.Join(transaction.walFilesPath(), "commit-graphs")
+ if err := os.MkdirAll(walGraphsDir, perm.PrivateDir); err != nil {
+ return fmt.Errorf("creating commit-graphs dir in WAL dir: %w", err)
+ }
+ for _, entry := range graphEntries {
+ if err := os.Link(
+ filepath.Join(commitGraphsDir, entry.Name()),
+ filepath.Join(walGraphsDir, entry.Name()),
+ ); err != nil {
+ return fmt.Errorf("linking commit-graphs entry to WAL dir: %w", err)
+ }
+ }
+ }
+ }
+
+ if err := safe.NewSyncer().Sync(transaction.walFilesPath()); err != nil {
+ return fmt.Errorf("sync: %w", err)
+ }
+
+ return nil
+}
+
+// packfileExtensions contains the packfile extension and its dependencies. They will be collected after running
+// repacking command.
+var packfileExtensions = map[string]struct{}{
+ "multi-pack-index": {},
+ ".pack": {},
+ ".idx": {},
+ ".rev": {},
+ ".mtimes": {},
+ ".bitmap": {},
+}
+
+// collectPackfiles collects the list of packfiles and their luggage files.
+func (mgr *TransactionManager) collectPackfiles(ctx context.Context, repoPath string) (map[string]struct{}, error) {
+ files, err := os.ReadDir(filepath.Join(repoPath, "objects", "pack"))
+ if err != nil {
+ // The repository has not been packed before. Nothing to collect.
+ if errors.Is(err, os.ErrNotExist) {
+ return nil, nil
+ }
+ return nil, fmt.Errorf("reading objects/pack dir: %w", err)
+ }
+
+ // Filter packfiles and relevant files.
+ collectedFiles := make(map[string]struct{})
+ for _, file := range files {
+ // objects/pack directory should not include any sub-directory. We can simply ignore them.
+ if file.IsDir() {
+ continue
+ }
+ for extension := range packfileExtensions {
+ if strings.HasSuffix(file.Name(), extension) {
+ collectedFiles[file.Name()] = struct{}{}
+ }
+ }
+ }
+
+ return collectedFiles, nil
+}
+
// unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller.
func unwrapExpectedError(err error) error {
// The manager controls its own execution context and it is canceled only when Stop is called.
@@ -1830,8 +2066,14 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti
return nil, fmt.Errorf("verifying pack refs: %w", err)
}
+ repackEntry, err := mgr.verifyRepacking(mgr.ctx, transaction)
+ if err != nil {
+ return nil, fmt.Errorf("verifying repacking: %w", err)
+ }
+
return &gitalypb.LogEntry_Housekeeping{
PackRefs: packRefsEntry,
+ Repack: repackEntry,
}, nil
}
@@ -1892,6 +2134,150 @@ func (mgr *TransactionManager) verifyPackRefs(ctx context.Context, transaction *
}, nil
}
+// The function verifyRepacking confirms that the repacking process can integrate a new collection of packfiles.
+// Repacking should generally be risk-free as it systematically arranges packfiles and removes old, unused objects. The
+// standard housekeeping procedure includes a grace period, in days, to protect objects that might be in use by
+// simultaneous operations. However, the transaction manager deals with concurrent requests more effectively. By
+// reviewing the list of transactions since the repacking began, we can identify potential conflicts involving
+// concurrent processes.
+// We need to watch out for two types of conflicts:
+// 1. Overlapping transactions could reference objects that have been pruned. Extracting these pruned objects from the
+// repacking process isn't straightforward. The repacking task must confirm that these referenced objects are still
+// accessible in the repacked repository. This is done using the `git-cat-file --batch-check` command. Simultaneously,
+// it's possible for these transactions to refer to new objects created by other concurrent transactions. So, we need to
+// setup a working directory and apply changed packfiles for checking.
+// 2. There might be transactions in progress that rely on dependencies that have been pruned. This type of conflict
+// can't be fully checked until Git v2.43 is implemented in Gitaly, as detailed in the GitLab epic
+// (https://gitlab.com/groups/gitlab-org/-/epics/11242).
+// It's important to note that this verification is specific to the `RepackObjectsStrategyFullWithCruft` strategy. Other
+// strategies focus solely on reorganizing packfiles and do not remove objects.
+func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction *Transaction) (_ *gitalypb.LogEntry_Housekeeping_Repack, finalErr error) {
+ repack := transaction.runHousekeeping.repack
+ if repack == nil {
+ return nil, nil
+ }
+ // Other strategies re-organize packfiles without pruning unreachable objects. No need to run following
+ // expensive verification.
+ if repack.config.Strategy != housekeeping.RepackObjectsStrategyFullWithCruft {
+ return &gitalypb.LogEntry_Housekeeping_Repack{
+ NewFiles: repack.newFiles,
+ DeletedFiles: repack.deletedFiles,
+ }, nil
+ }
+
+ // Setup a working repository including the snapshot repository and all changes of concurrent transactions.
+ relativePath := transaction.snapshotRepository.GetRelativePath()
+ snapshot, err := newSnapshot(ctx,
+ mgr.storagePath,
+ filepath.Join(transaction.stagingDirectory, "staging"),
+ []string{relativePath},
+ )
+ if err != nil {
+ return nil, fmt.Errorf("new snapshot: %w", err)
+ }
+
+ workingRepository := mgr.repositoryFactory.Build(snapshot.relativePath(relativePath))
+ workingRepositoryPath, err := workingRepository.Path()
+ if err != nil {
+ return nil, fmt.Errorf("getting working repository path: %w", err)
+ }
+
+ objectHash, err := workingRepository.ObjectHash(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("detecting object hash: %w", err)
+ }
+
+ referenceTips := map[string]struct{}{}
+ elm := mgr.committedEntries.Front()
+ for elm != nil {
+ committed := elm.Value.(*committedEntry)
+ if committed.lsn > transaction.snapshotLSN && committed.entry.RelativePath == transaction.relativePath {
+ // Collect reference tips. All of them should exist in the resulting packfile or new concurrent
+ // packfiles while repacking is running.
+ for _, txn := range committed.entry.GetReferenceTransactions() {
+ for _, change := range txn.GetChanges() {
+ oid := change.GetNewOid()
+ if !objectHash.IsZeroOID(git.ObjectID(oid)) {
+ referenceTips[string(oid)] = struct{}{}
+ }
+ }
+ }
+
+ // Linking the changes to the working directory if it does not match deleted files.
+ packPrefix := committed.entry.GetPackPrefix()
+ if packPrefix != "" {
+ shouldApply := true
+ for _, deletedFile := range repack.deletedFiles {
+ if packPrefix == deletedFile {
+ shouldApply = false
+ break
+ }
+ }
+ if shouldApply {
+ if err := mgr.applyPackFileToRepository(ctx, committed.lsn, committed.entry, workingRepositoryPath); err != nil {
+ return nil, fmt.Errorf("applying packfiles to working repository: %w", err)
+ }
+ }
+ }
+ }
+ elm = elm.Next()
+ }
+
+ if len(referenceTips) == 0 {
+ return &gitalypb.LogEntry_Housekeeping_Repack{
+ NewFiles: repack.newFiles,
+ DeletedFiles: repack.deletedFiles,
+ }, nil
+ }
+
+ // Although we have a wrapper for caching `git-cat-file` process, this command targets the snapshot repository
+ // that the repacking task depends on. This task might run for minutes to hours. It's unlikely there is any
+ // concurrent operation sharing this process. So, no need to cache it. An alternative solution is to read and
+ // parse the index files of the working directory. That approach avoids spawning `git-cat-file(1)`. In contrast,
+ // it requires storing the list of objects in memory.
+ var stderr bytes.Buffer
+ ctx, cancel := context.WithCancel(ctx)
+ catFileCmd, err := workingRepository.Exec(ctx, git.Command{
+ Name: "cat-file",
+ Flags: []git.Option{
+ git.Flag{Name: "--batch-check"},
+ git.Flag{Name: "-Z"},
+ },
+ }, git.WithSetupStdin(), git.WithSetupStdout(), git.WithStderr(&stderr))
+ if err != nil {
+ cancel()
+ return nil, structerr.New("spawning cat-file command: %w", err).WithMetadata("stderr", stderr.String())
+ }
+ defer func() {
+ cancel()
+ if err := catFileCmd.Wait(); finalErr == nil && err != nil && !errors.Is(err, context.Canceled) {
+ finalErr = structerr.New("waiting for cat-file command: %w", err).WithMetadata("stderr", stderr.String())
+ }
+ }()
+
+ stdout := bufio.NewReader(catFileCmd)
+ for oid := range referenceTips {
+ // Verify if the reference tip is reachable from the resulting single packfile.
+ if _, err := catFileCmd.Write([]byte(fmt.Sprintf("%s\000", oid))); err != nil {
+ return nil, fmt.Errorf("writing cat-file command: %w", err)
+ }
+ if _, err := catfile.ParseObjectInfo(objectHash, stdout, true); err != nil {
+ if errors.As(err, &catfile.NotFoundError{}) {
+ return nil, errRepackConflictPrunedObject
+ }
+ return nil, fmt.Errorf("reading object info: %w", err)
+ }
+ }
+ if _, err := catFileCmd.Write([]byte("flush\000")); err != nil {
+ return nil, fmt.Errorf("writing flush: %w", err)
+ }
+
+ return &gitalypb.LogEntry_Housekeeping_Repack{
+ NewFiles: repack.newFiles,
+ DeletedFiles: repack.deletedFiles,
+ }, nil
+}
+
// applyDefaultBranchUpdate applies the default branch update to the repository from the log entry.
func (mgr *TransactionManager) applyDefaultBranchUpdate(ctx context.Context, logEntry *gitalypb.LogEntry) error {
if logEntry.DefaultBranchUpdate == nil {
@@ -2311,7 +2697,11 @@ func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, logE
// has an associated pack file. This is done by hard linking the pack and index from the
// log into the repository's object directory.
func (mgr *TransactionManager) applyPackFile(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error {
- packDirectory := filepath.Join(mgr.getAbsolutePath(logEntry.RelativePath), "objects", "pack")
+ return mgr.applyPackFileToRepository(ctx, lsn, logEntry, mgr.getAbsolutePath(logEntry.RelativePath))
+}
+
+func (mgr *TransactionManager) applyPackFileToRepository(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry, repoPath string) error {
+ packDirectory := filepath.Join(repoPath, "objects", "pack")
for _, fileExtension := range []string{
".pack",
".idx",
@@ -2378,81 +2768,173 @@ func (mgr *TransactionManager) applyHousekeeping(ctx context.Context, lsn LSN, l
if logEntry.Housekeeping == nil {
return nil
}
+
+ if err := mgr.applyPackRefs(ctx, lsn, logEntry); err != nil {
+ return fmt.Errorf("applying pack refs: %w", err)
+ }
+
+ if err := mgr.applyRepacking(ctx, lsn, logEntry); err != nil {
+ return fmt.Errorf("applying repacking: %w", err)
+ }
+
+ return nil
+}
+
+func (mgr *TransactionManager) applyPackRefs(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error {
+ if logEntry.Housekeeping.PackRefs == nil {
+ return nil
+ }
+
repositoryPath := mgr.getAbsolutePath(logEntry.RelativePath)
- if logEntry.Housekeeping.PackRefs != nil {
- // Remove packed-refs lock. While we shouldn't be producing any new stale locks, it makes sense to have
- // this for historic state until we're certain none of the repositories contain stale locks anymore.
- // This clean up is not needed afterward.
- if err := mgr.removePackedRefsLocks(ctx, repositoryPath); err != nil {
- return fmt.Errorf("applying pack-refs: %w", err)
+ // Remove packed-refs lock. While we shouldn't be producing any new stale locks, it makes sense to have
+ // this for historic state until we're certain none of the repositories contain stale locks anymore.
+ // This clean up is not needed afterward.
+ if err := mgr.removePackedRefsLocks(ctx, repositoryPath); err != nil {
+ return fmt.Errorf("applying pack-refs: %w", err)
+ }
+
+ packedRefsPath := filepath.Join(repositoryPath, "packed-refs")
+ // Replace the packed-refs file.
+ if err := os.Remove(packedRefsPath); err != nil {
+ if !errors.Is(err, os.ErrNotExist) {
+ return fmt.Errorf("removing existing pack-refs: %w", err)
}
+ }
+ if err := os.Link(
+ filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "packed-refs"),
+ packedRefsPath,
+ ); err != nil {
+ return fmt.Errorf("linking new packed-refs: %w", err)
+ }
- packedRefsPath := filepath.Join(repositoryPath, "packed-refs")
- // Replace the packed-refs file.
- if err := os.Remove(packedRefsPath); err != nil {
+ modifiedDirs := map[string]struct{}{}
+ // Prune loose references. The log entry carries the list of fully qualified references to prune.
+ for _, ref := range logEntry.Housekeeping.PackRefs.PrunedRefs {
+ path := filepath.Join(repositoryPath, string(ref))
+ if err := os.Remove(path); err != nil {
if !errors.Is(err, os.ErrNotExist) {
- return fmt.Errorf("removing existing pack-refs: %w", err)
+ return structerr.New("pruning loose reference: %w", err).WithMetadata("ref", path)
}
}
- if err := os.Link(
- filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "packed-refs"),
- packedRefsPath,
- ); err != nil {
- return fmt.Errorf("linking new packed-refs: %w", err)
- }
+ modifiedDirs[filepath.Dir(path)] = struct{}{}
+ }
- modifiedDirs := map[string]struct{}{}
- // Prune loose references. The log entry carries the list of fully qualified references to prune.
- for _, ref := range logEntry.Housekeeping.PackRefs.PrunedRefs {
- path := filepath.Join(repositoryPath, string(ref))
- if err := os.Remove(path); err != nil {
- if !errors.Is(err, os.ErrNotExist) {
- return structerr.New("pruning loose reference: %w", err).WithMetadata("ref", path)
+ syncer := safe.NewSyncer()
+ // Traverse all modified dirs back to the root "refs" dir of the repository. Remove any empty directory
+ // along the way. It prevents leaving empty dirs around after a loose ref is pruned. `git-pack-refs`
+ // command does dir removal for us, but in staginge repository during preparation stage. In the actual
+ // repository, we need to do it ourselves.
+ rootRefDir := filepath.Join(repositoryPath, "refs")
+ for dir := range modifiedDirs {
+ for dir != rootRefDir {
+ if isEmpty, err := isDirEmpty(dir); err != nil {
+ // If a dir does not exist, it properly means a directory may already be deleted by a
+ // previous interrupted attempt on applying the log entry. We simply ignore the error
+ // and move up the directory hierarchy.
+ if errors.Is(err, fs.ErrNotExist) {
+ dir = filepath.Dir(dir)
+ continue
+ } else {
+ return fmt.Errorf("checking empty ref dir: %w", err)
}
+ } else if !isEmpty {
+ break
}
- modifiedDirs[filepath.Dir(path)] = struct{}{}
- }
-
- syncer := safe.NewSyncer()
- // Traverse all modified dirs back to the root "refs" dir of the repository. Remove any empty directory
- // along the way. It prevents leaving empty dirs around after a loose ref is pruned. `git-pack-refs`
- // command does dir removal for us, but in staginge repository during preparation stage. In the actual
- // repository, we need to do it ourselves.
- rootRefDir := filepath.Join(repositoryPath, "refs")
- for dir := range modifiedDirs {
- for dir != rootRefDir {
- if isEmpty, err := isDirEmpty(dir); err != nil {
- // If a dir does not exist, it properly means a directory may already be deleted by a
- // previous interrupted attempt on applying the log entry. We simply ignore the error
- // and move up the directory hierarchy.
- if errors.Is(err, fs.ErrNotExist) {
- dir = filepath.Dir(dir)
- continue
- } else {
- return fmt.Errorf("checking empty ref dir: %w", err)
- }
- } else if !isEmpty {
- break
- }
- if err := os.Remove(dir); err != nil {
- return fmt.Errorf("removing empty ref dir: %w", err)
- }
- dir = filepath.Dir(dir)
+ if err := os.Remove(dir); err != nil {
+ return fmt.Errorf("removing empty ref dir: %w", err)
+ }
+ dir = filepath.Dir(dir)
+ }
+ // If there is any empty dir along the way, it's removed and dir pointer moves up until the dir
+ // is not empty or reaching the root dir. That one should be fsynced to flush the dir removal.
+ // If there is no empty dir, it stays at the dir of pruned refs, which also needs a flush.
+ if err := syncer.Sync(dir); err != nil {
+ return fmt.Errorf("sync dir: %w", err)
+ }
+ }
+
+ // Sync the root of the repository to flush packed-refs replacement.
+ if err := syncer.SyncParent(packedRefsPath); err != nil {
+ return fmt.Errorf("sync parent: %w", err)
+ }
+ return nil
+}
+
+// applyRepacking applies the new packfile set and removed known pruned packfiles. New packfiles created by concurrent
+// changes are kept intact.
+func (mgr *TransactionManager) applyRepacking(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error {
+ if logEntry.Housekeeping.Repack == nil {
+ return nil
+ }
+
+ repack := logEntry.Housekeeping.Repack
+ repoPath := mgr.getAbsolutePath(logEntry.RelativePath)
+
+ for _, file := range repack.NewFiles {
+ if err := os.Link(
+ filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), file),
+ filepath.Join(repoPath, "objects", "pack", file),
+ ); err != nil {
+ // A new resulting packfile might exist if the log entry is re-applied after a crash.
+ if !errors.Is(err, os.ErrExist) {
+ return fmt.Errorf("linking new packfile: %w", err)
}
- // If there is any empty dir along the way, it's removed and dir pointer moves up until the dir
- // is not empty or reaching the root dir. That one should be fsynced to flush the dir removal.
- // If there is no empty dir, it stays at the dir of pruned refs, which also needs a flush.
- if err := syncer.Sync(dir); err != nil {
- return fmt.Errorf("sync dir: %w", err)
+ }
+ }
+
+ // Clean up and apply commit graphs.
+ walGraphsDir := filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "commit-graphs")
+ if graphEntries, err := os.ReadDir(walGraphsDir); err != nil {
+ if !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+ } else if len(graphEntries) > 0 {
+ commitGraphsDir := filepath.Join(repoPath, "objects", "info", "commit-graphs")
+ if err := os.RemoveAll(commitGraphsDir); err != nil {
+ return fmt.Errorf("reseting commit-graphs dir: %w", err)
+ }
+ if err := os.MkdirAll(commitGraphsDir, perm.PrivateDir); err != nil {
+ return fmt.Errorf("creating commit-graphs dir: %w", err)
+ }
+ for _, entry := range graphEntries {
+ if err := os.Link(
+ filepath.Join(walGraphsDir, entry.Name()),
+ filepath.Join(commitGraphsDir, entry.Name()),
+ ); err != nil {
+ return fmt.Errorf("linking commit-graphs entry: %w", err)
}
}
+ }
- // Sync the root of the repository to flush packed-refs replacement.
- if err := syncer.SyncParent(packedRefsPath); err != nil {
- return fmt.Errorf("sync parent: %w", err)
+ for _, file := range repack.DeletedFiles {
+ if err := os.Remove(filepath.Join(repoPath, "objects", "pack", file)); err != nil {
+ // Repacking task is the only operation that touch an on-disk packfile. Other operations should
+ // only create new packfiles. However, after a crash, a pre-existing packfile might be cleaned up
+ // beforehand.
+ if !errors.Is(err, os.ErrNotExist) {
+ return fmt.Errorf("clean up repacked packfile: %w", err)
+ }
}
}
+
+ // During migration to the new transaction system, loose objects might still exist here and there. So, this task
+ // needs to clean up redundant loose objects. After the target repository runs repacking for the first time,
+ // there shouldn't be any further loose objects. All of them exist in packfiles. Afterward, this command will
+ // exist instantly. We can remove this run after the transaction system is fully applied.
+ repo := mgr.repositoryFactory.Build(logEntry.RelativePath)
+ var stderr bytes.Buffer
+ if err := repo.ExecAndWait(ctx, git.Command{
+ Name: "prune-packed",
+ Flags: []git.Option{git.Flag{Name: "--quiet"}},
+ }, git.WithStderr(&stderr)); err != nil {
+ return structerr.New("exec prune-packed: %w", err).WithMetadata("stderr", stderr.String())
+ }
+
+ if err := safe.NewSyncer().Sync(filepath.Join(repoPath, "objects")); err != nil {
+ return fmt.Errorf("sync recursive: %w", err)
+ }
+
return nil
}