diff options
Diffstat (limited to 'internal/gitaly/storage/storagemgr/transaction_manager.go')
-rw-r--r-- | internal/gitaly/storage/storagemgr/transaction_manager.go | 604 |
1 files changed, 543 insertions, 61 deletions
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 6cf98ffbe..1bb77f115 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -1,6 +1,7 @@ package storagemgr import ( + "bufio" "bytes" "container/list" "context" @@ -19,6 +20,7 @@ import ( "github.com/dgraph-io/badger/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" @@ -68,6 +70,9 @@ var ( errHousekeepingConflictOtherUpdates = errors.New("housekeeping in the same transaction with other updates") // errHousekeepingConflictConcurrent is returned when there are another concurrent housekeeping task. errHousekeepingConflictConcurrent = errors.New("conflict with another concurrent housekeeping task") + // errRepackConflictPrunedObject is returned when the repacking task pruned an object that is still used by other + // concurrent transactions. + errRepackConflictPrunedObject = errors.New("pruned object used by other updates") // Below errors are used to error out in cases when updates have been staged in a read-only transaction. errReadOnlyReferenceUpdates = errors.New("reference updates staged in a read-only transaction") @@ -150,6 +155,7 @@ type repositoryCreation struct { // such as the cleanup of unneeded files and optimizations for the repository's data structures. type runHousekeeping struct { packRefs *runPackRefs + repack *runRepack } // runPackRefs models refs packing housekeeping task. It packs heads and tags for efficient repository access. @@ -159,6 +165,24 @@ type runPackRefs struct { PrunedRefs map[git.ReferenceName]struct{} } +// runRepack models repack housekeeping task. We support multiple repacking strategies. At this stage, the outside +// scheduler determines which strategy to use. The transaction manager is responsible for executing it. In the future, +// we want to make housekeeping smarter by migrating housekeeping scheduling responsibility to this manager. That work +// is tracked in https://gitlab.com/gitlab-org/gitaly/-/issues/5709. +type runRepack struct { + // config tells which strategy and baggaged options. + config housekeeping.RepackObjectsConfig + isFullRepack bool + // newFiles contains the list of new packfiles to be applied into the destination repository. + newFiles []string + // deletedFiles contains the list of packfiles should be removed in the destination repository. The repacking + // command runs in the snapshot repository for a significant amount of time. During so, other requests + // containing new objects can land and be applied before housekeeping task finishes. So, we keep a snapshot of + // packfile structure before and after running the task. When the manager applies the task, it targets those + // known files only. Other packfiles can still co-exist along side with the resulting repacked packfile(s). + deletedFiles []string +} + // ReferenceUpdates contains references to update. Reference name is used as the key and the value // is the expected old tip and the desired new tip. type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate @@ -624,6 +648,16 @@ func (txn *Transaction) PackRefs() { } } +// Repack sets repacking housekeeping task as a part of the transaction. +func (txn *Transaction) Repack(config housekeeping.RepackObjectsConfig) { + if txn.runHousekeeping == nil { + txn.runHousekeeping = &runHousekeeping{} + } + txn.runHousekeeping.repack = &runRepack{ + config: config, + } +} + // IncludeObject includes the given object and its dependencies in the transaction's logged pack file even // if the object is unreachable from the references. func (txn *Transaction) IncludeObject(oid git.ObjectID) { @@ -1148,6 +1182,9 @@ func (mgr *TransactionManager) prepareHousekeeping(ctx context.Context, transact if err := mgr.preparePackRefs(ctx, transaction); err != nil { return err } + if err := mgr.prepareRepacking(ctx, transaction); err != nil { + return err + } return nil } @@ -1198,7 +1235,7 @@ func (mgr *TransactionManager) preparePackRefs(ctx context.Context, transaction return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String()) } - if err := os.Chmod(filepath.Join(filepath.Join(repoPath, "packed-refs")), perm.SharedReadOnlyFile); err != nil { + if err := os.Chmod(filepath.Join(repoPath, "packed-refs"), perm.SharedReadOnlyFile); err != nil { return fmt.Errorf("modifying permission of pack-refs file") } @@ -1228,6 +1265,205 @@ func (mgr *TransactionManager) preparePackRefs(ctx context.Context, transaction return nil } +// prepareRepacking runs git-repack(1) command against the snapshot repository using desired repacking strategy. Each +// strategy has a different cost and effect corresponding to scheduling frequency. +// - IncrementalWithUnreachable: pack all loose objects into one packfile. This strategy is a no-op because all new +// objects regardless of their reachablity status are packed by default by the manager. +// - Geometric: merge all packs together with geometric repacking. This is expensive or cheap depending on which packs +// get merged. No need for a connectivity check. +// - FullWithUnreachable: merge all packs into one but keep unreachable objects. This is more expensive but we don't +// take connectivity into account. This strategy is essential for object pool. As we cannot prune objects in a pool, +// packing them into one single packfile boosts its performance. +// - FullWithCruft: Merge all packs into one and prune unreachable objects. It is the most effective, but yet costly +// strategy. We cannot run this type of task frequently on a large repository. This stategy is handled as a full +// repacking without cruft because we don't need object expiry. +// Before the command runs, we capture a snapshot of existing packfiles. After the command finishes, we re-capture the +// list and extract the list of to-be-updated packfiles. This practice is to prevent repacking task from deleting +// packfiles of other concurrent updates at the applying phase. +func (mgr *TransactionManager) prepareRepacking(ctx context.Context, transaction *Transaction) error { + if transaction.runHousekeeping.repack == nil { + return nil + } + + var err error + repack := transaction.runHousekeeping.repack + + // Build a working repository pointing to snapshot repository. Housekeeping task can access the repository + // without the needs for quarantine. + workingRepository := mgr.repositoryFactory.Build(transaction.snapshot.relativePath(transaction.relativePath)) + repoPath := mgr.getAbsolutePath(workingRepository.GetRelativePath()) + + if repack.isFullRepack, err = housekeeping.ValidateRepacking(repack.config); err != nil { + return fmt.Errorf("validating repacking: %w", err) + } + + if repack.config.Strategy == housekeeping.RepackObjectsStrategyIncrementalWithUnreachable { + // Once the transaction manager has been applied and at least one complete repack has occurred, there + // should be no loose unreachable objects remaining in the repository. When the transaction manager + // processes a change, it consolidates all unreachable objects and objects about to become reachable + // into a new packfile, which is then placed in the repository. As a result, unreachable objects may + // still exist but are confined to packfiles. These will eventually be cleaned up during a full repack. + // In the interim, geometric repacking is utilized to optimize the structure of packfiles for faster + // access. Therefore, this operation is effectively a no-op. However, we maintain it for the sake of + // backward compatibility with the existing housekeeping scheduler. + return nil + } + + // Capture the list of packfiles and their baggages before repacking. + beforeFiles, err := mgr.collectPackfiles(ctx, repoPath) + if err != nil { + return fmt.Errorf("collecting existing packfiles: %w", err) + } + + switch repack.config.Strategy { + case housekeeping.RepackObjectsStrategyGeometric: + // Geometric repacking rearranges the list of packfiles according to a geometric progression. This process + // does not consider object reachability. Since all unreachable objects remain within small packfiles, + // they become included in the newly created packfiles. Geometric repacking does not prune any objects. + if err := housekeeping.PerformGeometricRepacking(ctx, workingRepository, repack.config); err != nil { + return fmt.Errorf("perform geometric repacking: %w", err) + } + case housekeeping.RepackObjectsStrategyFullWithUnreachable: + // This strategy merges all packfiles into a single packfile, simultaneously removing any loose objects + // if present. Unreachable objects are then appended to the end of this unified packfile. Although the + // `git-repack(1)` command does not offer an option to specifically pack loose unreachable objects, this + // is not an issue because the transaction manager already ensures that unreachable objects are + // contained within packfiles. Therefore, this strategy effectively consolidates all packfiles into a + // single one. Adopting this strategy is crucial for alternates, as it ensures that we can manage + // objects within an object pool without the capability to prune them. + if err := housekeeping.PerformFullRepackingWithUnreachable(ctx, workingRepository, repack.config); err != nil { + return err + } + case housekeeping.RepackObjectsStrategyFullWithCruft: + // Both of above strategies don't prune unreachable objects. They re-organize the objects between + // packfiles. In the traditional housekeeping, the manager gets rid of unreachable objects via full + // repacking with cruft. It pushes all unreachable objects to a cruft packfile and keeps track of each + // object mtimes. All unreachable objects exceeding a grace period are cleaned up. The grace period is + // to ensure the housekeeping doesn't delete a to-be-reachable object accidentally, for example when GC + // runs while a concurrent push is being processed. + // The transaction manager handles concurrent requests very differently from the original git way. Each + // request runs on a snapshot repository and the results are collected in the form of packfiles. Those + // packfiles contain resulting reachable and unreachable objects. As a result, we don't need to take + // object expiry nor curft pack into account. This operation triggers a normal full repack without + // cruft packing. + // Afterward, packed unreachable objects are removed. During migration to transaction system, there + // might be some loose unreachable objects. They will eventually be packed via either of the above tasks. + if err := housekeeping.PerformRepack(ctx, workingRepository, repack.config, + // Do a full repack. + git.Flag{Name: "-a"}, + // Don't include objects part of alternate. + git.Flag{Name: "-l"}, + // Delete loose objects made redundant by this repack. This option is essential during migration + // to the new transaction system. Afterward, it can be removed. + git.Flag{Name: "-d"}, + ); err != nil { + return err + } + + if err := housekeeping.WriteCommitGraph(ctx, workingRepository, housekeeping.WriteCommitGraphConfig{ReplaceChain: true}); err != nil { + return fmt.Errorf("re-writing commit graph: %w", err) + } + } + + // Re-capture the list of packfiles and their baggages after repacking. + afterFiles, err := mgr.collectPackfiles(ctx, repoPath) + if err != nil { + return fmt.Errorf("collecting new packfiles: %w", err) + } + + for file := range beforeFiles { + // We delete the files only if it's missing from the before set. + if _, exist := afterFiles[file]; !exist { + repack.deletedFiles = append(repack.deletedFiles, file) + } + } + + for file := range afterFiles { + // Similarly, we don't need to link existing packfiles. + if _, exist := beforeFiles[file]; !exist { + repack.newFiles = append(repack.newFiles, file) + } + } + + // Copy new files to lsn directory. + for _, file := range repack.newFiles { + if err := os.Link( + filepath.Join(filepath.Join(filepath.Join(repoPath, "objects", "pack"), file)), + filepath.Join(transaction.walFilesPath(), file), + ); err != nil { + return fmt.Errorf("copying packfiles to WAL directory: %w", err) + } + } + + // Copy the whole commit graphs over. + if repack.config.Strategy == housekeeping.RepackObjectsStrategyFullWithCruft && (len(repack.newFiles) > 0 || len(repack.deletedFiles) > 0) { + commitGraphsDir := filepath.Join(repoPath, "objects", "info", "commit-graphs") + if graphEntries, err := os.ReadDir(commitGraphsDir); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + } else if len(graphEntries) > 0 { + walGraphsDir := filepath.Join(transaction.walFilesPath(), "commit-graphs") + if err := os.MkdirAll(walGraphsDir, perm.PrivateDir); err != nil { + return fmt.Errorf("creating commit-graphs dir in WAL dir: %w", err) + } + for _, entry := range graphEntries { + if err := os.Link( + filepath.Join(commitGraphsDir, entry.Name()), + filepath.Join(walGraphsDir, entry.Name()), + ); err != nil { + return fmt.Errorf("linking commit-graphs entry to WAL dir: %w", err) + } + } + } + } + + if err := safe.NewSyncer().Sync(transaction.walFilesPath()); err != nil { + return fmt.Errorf("sync: %w", err) + } + + return nil +} + +// packfileExtensions contains the packfile extension and its dependencies. They will be collected after running +// repacking command. +var packfileExtensions = map[string]struct{}{ + "multi-pack-index": {}, + ".pack": {}, + ".idx": {}, + ".rev": {}, + ".mtimes": {}, + ".bitmap": {}, +} + +// collectPackfiles collects the list of packfiles and their luggage files. +func (mgr *TransactionManager) collectPackfiles(ctx context.Context, repoPath string) (map[string]struct{}, error) { + files, err := os.ReadDir(filepath.Join(repoPath, "objects", "pack")) + if err != nil { + // The repository has not been packed before. Nothing to collect. + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, fmt.Errorf("reading objects/pack dir: %w", err) + } + + // Filter packfiles and relevant files. + collectedFiles := make(map[string]struct{}) + for _, file := range files { + // objects/pack directory should not include any sub-directory. We can simply ignore them. + if file.IsDir() { + continue + } + for extension := range packfileExtensions { + if strings.HasSuffix(file.Name(), extension) { + collectedFiles[file.Name()] = struct{}{} + } + } + } + + return collectedFiles, nil +} + // unwrapExpectedError unwraps expected errors that may occur and returns them directly to the caller. func unwrapExpectedError(err error) error { // The manager controls its own execution context and it is canceled only when Stop is called. @@ -1830,8 +2066,14 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti return nil, fmt.Errorf("verifying pack refs: %w", err) } + repackEntry, err := mgr.verifyRepacking(mgr.ctx, transaction) + if err != nil { + return nil, fmt.Errorf("verifying repacking: %w", err) + } + return &gitalypb.LogEntry_Housekeeping{ PackRefs: packRefsEntry, + Repack: repackEntry, }, nil } @@ -1892,6 +2134,150 @@ func (mgr *TransactionManager) verifyPackRefs(ctx context.Context, transaction * }, nil } +// The function verifyRepacking confirms that the repacking process can integrate a new collection of packfiles. +// Repacking should generally be risk-free as it systematically arranges packfiles and removes old, unused objects. The +// standard housekeeping procedure includes a grace period, in days, to protect objects that might be in use by +// simultaneous operations. However, the transaction manager deals with concurrent requests more effectively. By +// reviewing the list of transactions since the repacking began, we can identify potential conflicts involving +// concurrent processes. +// We need to watch out for two types of conflicts: +// 1. Overlapping transactions could reference objects that have been pruned. Extracting these pruned objects from the +// repacking process isn't straightforward. The repacking task must confirm that these referenced objects are still +// accessible in the repacked repository. This is done using the `git-cat-file --batch-check` command. Simultaneously, +// it's possible for these transactions to refer to new objects created by other concurrent transactions. So, we need to +// setup a working directory and apply changed packfiles for checking. +// 2. There might be transactions in progress that rely on dependencies that have been pruned. This type of conflict +// can't be fully checked until Git v2.43 is implemented in Gitaly, as detailed in the GitLab epic +// (https://gitlab.com/groups/gitlab-org/-/epics/11242). +// It's important to note that this verification is specific to the `RepackObjectsStrategyFullWithCruft` strategy. Other +// strategies focus solely on reorganizing packfiles and do not remove objects. +func (mgr *TransactionManager) verifyRepacking(ctx context.Context, transaction *Transaction) (_ *gitalypb.LogEntry_Housekeeping_Repack, finalErr error) { + repack := transaction.runHousekeeping.repack + if repack == nil { + return nil, nil + } + // Other strategies re-organize packfiles without pruning unreachable objects. No need to run following + // expensive verification. + if repack.config.Strategy != housekeeping.RepackObjectsStrategyFullWithCruft { + return &gitalypb.LogEntry_Housekeeping_Repack{ + NewFiles: repack.newFiles, + DeletedFiles: repack.deletedFiles, + }, nil + } + + // Setup a working repository including the snapshot repository and all changes of concurrent transactions. + relativePath := transaction.snapshotRepository.GetRelativePath() + snapshot, err := newSnapshot(ctx, + mgr.storagePath, + filepath.Join(transaction.stagingDirectory, "staging"), + []string{relativePath}, + ) + if err != nil { + return nil, fmt.Errorf("new snapshot: %w", err) + } + + workingRepository := mgr.repositoryFactory.Build(snapshot.relativePath(relativePath)) + workingRepositoryPath, err := workingRepository.Path() + if err != nil { + return nil, fmt.Errorf("getting working repository path: %w", err) + } + + objectHash, err := workingRepository.ObjectHash(ctx) + if err != nil { + return nil, fmt.Errorf("detecting object hash: %w", err) + } + + referenceTips := map[string]struct{}{} + elm := mgr.committedEntries.Front() + for elm != nil { + committed := elm.Value.(*committedEntry) + if committed.lsn > transaction.snapshotLSN && committed.entry.RelativePath == transaction.relativePath { + // Collect reference tips. All of them should exist in the resulting packfile or new concurrent + // packfiles while repacking is running. + for _, txn := range committed.entry.GetReferenceTransactions() { + for _, change := range txn.GetChanges() { + oid := change.GetNewOid() + if !objectHash.IsZeroOID(git.ObjectID(oid)) { + referenceTips[string(oid)] = struct{}{} + } + } + } + + // Linking the changes to the working directory if it does not match deleted files. + packPrefix := committed.entry.GetPackPrefix() + if packPrefix != "" { + shouldApply := true + for _, deletedFile := range repack.deletedFiles { + if packPrefix == deletedFile { + shouldApply = false + break + } + } + if shouldApply { + if err := mgr.applyPackFileToRepository(ctx, committed.lsn, committed.entry, workingRepositoryPath); err != nil { + return nil, fmt.Errorf("applying packfiles to working repository: %w", err) + } + } + } + } + elm = elm.Next() + } + + if len(referenceTips) == 0 { + return &gitalypb.LogEntry_Housekeeping_Repack{ + NewFiles: repack.newFiles, + DeletedFiles: repack.deletedFiles, + }, nil + } + + // Although we have a wrapper for caching `git-cat-file` process, this command targets the snapshot repository + // that the repacking task depends on. This task might run for minutes to hours. It's unlikely there is any + // concurrent operation sharing this process. So, no need to cache it. An alternative solution is to read and + // parse the index files of the working directory. That approach avoids spawning `git-cat-file(1)`. In contrast, + // it requires storing the list of objects in memory. + var stderr bytes.Buffer + ctx, cancel := context.WithCancel(ctx) + catFileCmd, err := workingRepository.Exec(ctx, git.Command{ + Name: "cat-file", + Flags: []git.Option{ + git.Flag{Name: "--batch-check"}, + git.Flag{Name: "-Z"}, + }, + }, git.WithSetupStdin(), git.WithSetupStdout(), git.WithStderr(&stderr)) + if err != nil { + cancel() + return nil, structerr.New("spawning cat-file command: %w", err).WithMetadata("stderr", stderr.String()) + } + defer func() { + cancel() + if err := catFileCmd.Wait(); finalErr == nil && err != nil && !errors.Is(err, context.Canceled) { + finalErr = structerr.New("waiting for cat-file command: %w", err).WithMetadata("stderr", stderr.String()) + } + }() + + stdout := bufio.NewReader(catFileCmd) + for oid := range referenceTips { + // Verify if the reference tip is reachable from the resulting single packfile. + if _, err := catFileCmd.Write([]byte(fmt.Sprintf("%s\000", oid))); err != nil { + return nil, fmt.Errorf("writing cat-file command: %w", err) + } + if _, err := catfile.ParseObjectInfo(objectHash, stdout, true); err != nil { + if errors.As(err, &catfile.NotFoundError{}) { + return nil, errRepackConflictPrunedObject + } + return nil, fmt.Errorf("reading object info: %w", err) + } + } + if _, err := catFileCmd.Write([]byte("flush\000")); err != nil { + return nil, fmt.Errorf("writing flush: %w", err) + } + + return &gitalypb.LogEntry_Housekeeping_Repack{ + NewFiles: repack.newFiles, + DeletedFiles: repack.deletedFiles, + }, nil +} + // applyDefaultBranchUpdate applies the default branch update to the repository from the log entry. func (mgr *TransactionManager) applyDefaultBranchUpdate(ctx context.Context, logEntry *gitalypb.LogEntry) error { if logEntry.DefaultBranchUpdate == nil { @@ -2311,7 +2697,11 @@ func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, logE // has an associated pack file. This is done by hard linking the pack and index from the // log into the repository's object directory. func (mgr *TransactionManager) applyPackFile(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error { - packDirectory := filepath.Join(mgr.getAbsolutePath(logEntry.RelativePath), "objects", "pack") + return mgr.applyPackFileToRepository(ctx, lsn, logEntry, mgr.getAbsolutePath(logEntry.RelativePath)) +} + +func (mgr *TransactionManager) applyPackFileToRepository(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry, repoPath string) error { + packDirectory := filepath.Join(repoPath, "objects", "pack") for _, fileExtension := range []string{ ".pack", ".idx", @@ -2378,81 +2768,173 @@ func (mgr *TransactionManager) applyHousekeeping(ctx context.Context, lsn LSN, l if logEntry.Housekeeping == nil { return nil } + + if err := mgr.applyPackRefs(ctx, lsn, logEntry); err != nil { + return fmt.Errorf("applying pack refs: %w", err) + } + + if err := mgr.applyRepacking(ctx, lsn, logEntry); err != nil { + return fmt.Errorf("applying repacking: %w", err) + } + + return nil +} + +func (mgr *TransactionManager) applyPackRefs(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error { + if logEntry.Housekeeping.PackRefs == nil { + return nil + } + repositoryPath := mgr.getAbsolutePath(logEntry.RelativePath) - if logEntry.Housekeeping.PackRefs != nil { - // Remove packed-refs lock. While we shouldn't be producing any new stale locks, it makes sense to have - // this for historic state until we're certain none of the repositories contain stale locks anymore. - // This clean up is not needed afterward. - if err := mgr.removePackedRefsLocks(ctx, repositoryPath); err != nil { - return fmt.Errorf("applying pack-refs: %w", err) + // Remove packed-refs lock. While we shouldn't be producing any new stale locks, it makes sense to have + // this for historic state until we're certain none of the repositories contain stale locks anymore. + // This clean up is not needed afterward. + if err := mgr.removePackedRefsLocks(ctx, repositoryPath); err != nil { + return fmt.Errorf("applying pack-refs: %w", err) + } + + packedRefsPath := filepath.Join(repositoryPath, "packed-refs") + // Replace the packed-refs file. + if err := os.Remove(packedRefsPath); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("removing existing pack-refs: %w", err) } + } + if err := os.Link( + filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "packed-refs"), + packedRefsPath, + ); err != nil { + return fmt.Errorf("linking new packed-refs: %w", err) + } - packedRefsPath := filepath.Join(repositoryPath, "packed-refs") - // Replace the packed-refs file. - if err := os.Remove(packedRefsPath); err != nil { + modifiedDirs := map[string]struct{}{} + // Prune loose references. The log entry carries the list of fully qualified references to prune. + for _, ref := range logEntry.Housekeeping.PackRefs.PrunedRefs { + path := filepath.Join(repositoryPath, string(ref)) + if err := os.Remove(path); err != nil { if !errors.Is(err, os.ErrNotExist) { - return fmt.Errorf("removing existing pack-refs: %w", err) + return structerr.New("pruning loose reference: %w", err).WithMetadata("ref", path) } } - if err := os.Link( - filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "packed-refs"), - packedRefsPath, - ); err != nil { - return fmt.Errorf("linking new packed-refs: %w", err) - } + modifiedDirs[filepath.Dir(path)] = struct{}{} + } - modifiedDirs := map[string]struct{}{} - // Prune loose references. The log entry carries the list of fully qualified references to prune. - for _, ref := range logEntry.Housekeeping.PackRefs.PrunedRefs { - path := filepath.Join(repositoryPath, string(ref)) - if err := os.Remove(path); err != nil { - if !errors.Is(err, os.ErrNotExist) { - return structerr.New("pruning loose reference: %w", err).WithMetadata("ref", path) + syncer := safe.NewSyncer() + // Traverse all modified dirs back to the root "refs" dir of the repository. Remove any empty directory + // along the way. It prevents leaving empty dirs around after a loose ref is pruned. `git-pack-refs` + // command does dir removal for us, but in staginge repository during preparation stage. In the actual + // repository, we need to do it ourselves. + rootRefDir := filepath.Join(repositoryPath, "refs") + for dir := range modifiedDirs { + for dir != rootRefDir { + if isEmpty, err := isDirEmpty(dir); err != nil { + // If a dir does not exist, it properly means a directory may already be deleted by a + // previous interrupted attempt on applying the log entry. We simply ignore the error + // and move up the directory hierarchy. + if errors.Is(err, fs.ErrNotExist) { + dir = filepath.Dir(dir) + continue + } else { + return fmt.Errorf("checking empty ref dir: %w", err) } + } else if !isEmpty { + break } - modifiedDirs[filepath.Dir(path)] = struct{}{} - } - - syncer := safe.NewSyncer() - // Traverse all modified dirs back to the root "refs" dir of the repository. Remove any empty directory - // along the way. It prevents leaving empty dirs around after a loose ref is pruned. `git-pack-refs` - // command does dir removal for us, but in staginge repository during preparation stage. In the actual - // repository, we need to do it ourselves. - rootRefDir := filepath.Join(repositoryPath, "refs") - for dir := range modifiedDirs { - for dir != rootRefDir { - if isEmpty, err := isDirEmpty(dir); err != nil { - // If a dir does not exist, it properly means a directory may already be deleted by a - // previous interrupted attempt on applying the log entry. We simply ignore the error - // and move up the directory hierarchy. - if errors.Is(err, fs.ErrNotExist) { - dir = filepath.Dir(dir) - continue - } else { - return fmt.Errorf("checking empty ref dir: %w", err) - } - } else if !isEmpty { - break - } - if err := os.Remove(dir); err != nil { - return fmt.Errorf("removing empty ref dir: %w", err) - } - dir = filepath.Dir(dir) + if err := os.Remove(dir); err != nil { + return fmt.Errorf("removing empty ref dir: %w", err) + } + dir = filepath.Dir(dir) + } + // If there is any empty dir along the way, it's removed and dir pointer moves up until the dir + // is not empty or reaching the root dir. That one should be fsynced to flush the dir removal. + // If there is no empty dir, it stays at the dir of pruned refs, which also needs a flush. + if err := syncer.Sync(dir); err != nil { + return fmt.Errorf("sync dir: %w", err) + } + } + + // Sync the root of the repository to flush packed-refs replacement. + if err := syncer.SyncParent(packedRefsPath); err != nil { + return fmt.Errorf("sync parent: %w", err) + } + return nil +} + +// applyRepacking applies the new packfile set and removed known pruned packfiles. New packfiles created by concurrent +// changes are kept intact. +func (mgr *TransactionManager) applyRepacking(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error { + if logEntry.Housekeeping.Repack == nil { + return nil + } + + repack := logEntry.Housekeeping.Repack + repoPath := mgr.getAbsolutePath(logEntry.RelativePath) + + for _, file := range repack.NewFiles { + if err := os.Link( + filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), file), + filepath.Join(repoPath, "objects", "pack", file), + ); err != nil { + // A new resulting packfile might exist if the log entry is re-applied after a crash. + if !errors.Is(err, os.ErrExist) { + return fmt.Errorf("linking new packfile: %w", err) } - // If there is any empty dir along the way, it's removed and dir pointer moves up until the dir - // is not empty or reaching the root dir. That one should be fsynced to flush the dir removal. - // If there is no empty dir, it stays at the dir of pruned refs, which also needs a flush. - if err := syncer.Sync(dir); err != nil { - return fmt.Errorf("sync dir: %w", err) + } + } + + // Clean up and apply commit graphs. + walGraphsDir := filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "commit-graphs") + if graphEntries, err := os.ReadDir(walGraphsDir); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + } else if len(graphEntries) > 0 { + commitGraphsDir := filepath.Join(repoPath, "objects", "info", "commit-graphs") + if err := os.RemoveAll(commitGraphsDir); err != nil { + return fmt.Errorf("reseting commit-graphs dir: %w", err) + } + if err := os.MkdirAll(commitGraphsDir, perm.PrivateDir); err != nil { + return fmt.Errorf("creating commit-graphs dir: %w", err) + } + for _, entry := range graphEntries { + if err := os.Link( + filepath.Join(walGraphsDir, entry.Name()), + filepath.Join(commitGraphsDir, entry.Name()), + ); err != nil { + return fmt.Errorf("linking commit-graphs entry: %w", err) } } + } - // Sync the root of the repository to flush packed-refs replacement. - if err := syncer.SyncParent(packedRefsPath); err != nil { - return fmt.Errorf("sync parent: %w", err) + for _, file := range repack.DeletedFiles { + if err := os.Remove(filepath.Join(repoPath, "objects", "pack", file)); err != nil { + // Repacking task is the only operation that touch an on-disk packfile. Other operations should + // only create new packfiles. However, after a crash, a pre-existing packfile might be cleaned up + // beforehand. + if !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("clean up repacked packfile: %w", err) + } } } + + // During migration to the new transaction system, loose objects might still exist here and there. So, this task + // needs to clean up redundant loose objects. After the target repository runs repacking for the first time, + // there shouldn't be any further loose objects. All of them exist in packfiles. Afterward, this command will + // exist instantly. We can remove this run after the transaction system is fully applied. + repo := mgr.repositoryFactory.Build(logEntry.RelativePath) + var stderr bytes.Buffer + if err := repo.ExecAndWait(ctx, git.Command{ + Name: "prune-packed", + Flags: []git.Option{git.Flag{Name: "--quiet"}}, + }, git.WithStderr(&stderr)); err != nil { + return structerr.New("exec prune-packed: %w", err).WithMetadata("stderr", stderr.String()) + } + + if err := safe.NewSyncer().Sync(filepath.Join(repoPath, "objects")); err != nil { + return fmt.Errorf("sync recursive: %w", err) + } + return nil } |