diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-06-02 12:25:18 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-06-02 12:25:18 +0300 |
commit | 27db5949d60beb5170467da8188f1599ff4760c1 (patch) | |
tree | 7476a137ad41a80d421dd2b7b3f908639fb1a15d /internal/gitaly/transaction_manager.go | |
parent | 81496efc0d26dba7799d1392c80b06bce943cc29 (diff) | |
parent | 3cee3b150adc7935d611dd98dfca4b2c2149c0f7 (diff) |
Merge branch 'smh-move-packs-in-place' into 'master'
Apply logged pack files to repository without copying
Closes #5046
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5846
Merged-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: karthik nayak <knayak@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: karthik nayak <knayak@gitlab.com>
Diffstat (limited to 'internal/gitaly/transaction_manager.go')
-rw-r--r-- | internal/gitaly/transaction_manager.go | 195 |
1 files changed, 131 insertions, 64 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index ab493dd34..2bb757e30 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -11,6 +11,7 @@ import ( "io/fs" "os" "path/filepath" + "regexp" "sort" "strconv" "strings" @@ -153,9 +154,9 @@ type Transaction struct { // quarantineDirectory is the directory within the stagingDirectory where the new objects of the // transaction are quarantined. quarantineDirectory string - // includesPack is set if a pack file has been computed for the transaction and should be - // logged. - includesPack bool + // packPrefix contains the prefix (`pack-<digest>`) of the transaction's pack if the transaction + // had objects to log. + packPrefix string // stagingRepository is a repository that is used to stage the transaction. If there are quarantined // objects, it has the quarantine applied so the objects are available for verification and packing. stagingRepository *localrepo.Repo @@ -358,9 +359,10 @@ func (txn *Transaction) SetCustomHooks(customHooksTAR []byte) { txn.customHooksUpdate = &CustomHooksUpdate{CustomHooksTAR: customHooksTAR} } -// packFilePath returns the path to this transaction's pack file. -func (txn *Transaction) packFilePath() string { - return filepath.Join(txn.stagingDirectory, "transaction.pack") +// walFilesPath returns the path to the directory where this transaction is staging the files that will +// be logged alongside the transaction's log entry. +func (txn *Transaction) walFilesPath() string { + return filepath.Join(txn.stagingDirectory, "wal-files") } // TransactionManager is responsible for transaction management of a single repository. Each repository has @@ -587,6 +589,10 @@ func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, trans return nil } +// packPrefixRegexp matches the output of `git index-pack` where it +// prints the packs prefix in the format `pack <digest>`. +var packPrefixRegexp = regexp.MustCompile(`^pack\t([0-9a-f]+)\n$`) + // packObjects packs the objects included in the transaction into a single pack file that is ready // for logging. The pack file includes all unreachable objects that are about to be made reachable. func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) error { @@ -614,8 +620,6 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return nil } - transaction.includesPack = true - objectsReader, objectsWriter := io.Pipe() group, ctx := errgroup.WithContext(ctx) @@ -632,30 +636,67 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra return nil }) + packReader, packWriter := io.Pipe() group.Go(func() (returnedErr error) { - defer func() { objectsReader.CloseWithError(returnedErr) }() + defer func() { + objectsReader.CloseWithError(returnedErr) + packWriter.CloseWithError(returnedErr) + }() - destinationFile, err := os.OpenFile( - transaction.packFilePath(), - os.O_WRONLY|os.O_CREATE|os.O_EXCL, - perm.PrivateFile, - ) - if err != nil { - return fmt.Errorf("open file: %w", err) + if err := transaction.stagingRepository.PackObjects(ctx, objectsReader, packWriter); err != nil { + return fmt.Errorf("pack objects: %w", err) } - defer destinationFile.Close() - if err := transaction.stagingRepository.PackObjects(ctx, objectsReader, destinationFile); err != nil { - return fmt.Errorf("pack objects: %w", err) + return nil + }) + + group.Go(func() (returnedErr error) { + defer packReader.CloseWithError(returnedErr) + + if err := os.Mkdir(transaction.walFilesPath(), perm.PrivateDir); err != nil { + return fmt.Errorf("create wal files directory: %w", err) + } + + // index-pack places the pack and the index into the repository's object directory. The + // staging repository is configured with a quarantine so we execute it there. + var stdout, stderr bytes.Buffer + if err := transaction.stagingRepository.ExecAndWait(ctx, git.Command{ + Name: "index-pack", + Flags: []git.Option{git.Flag{Name: "--stdin"}}, + }, git.WithStdin(packReader), git.WithStdout(&stdout), git.WithStderr(&stderr)); err != nil { + return structerr.New("index pack: %w", err).WithMetadata("stderr", stderr.String()) + } + + matches := packPrefixRegexp.FindStringSubmatch(stdout.String()) + if len(matches) != 2 { + return structerr.New("unexpected index-pack output").WithMetadata("stdout", stdout.String()) + } + + // Move the files from the quarantine to the wal-files directory so they'll get logged as part + // of the directory. + packPrefix := fmt.Sprintf("pack-%s", matches[1]) + for _, fileName := range []string{ + packPrefix + ".pack", + packPrefix + ".idx", + } { + if err := os.Rename( + filepath.Join(transaction.quarantineDirectory, "pack", fileName), + filepath.Join(transaction.walFilesPath(), fileName), + ); err != nil { + return fmt.Errorf("move file: %w", err) + } } - // Sync the contents of the pack so they are flushed to disk prior to the transaction - // being admitted for processing. - if err := destinationFile.Sync(); err != nil { - return fmt.Errorf("sync pack: %w", err) + // Sync the files and the directory entries so everything is flushed to the disk prior + // to moving on to committing the log entry. This way we only have to flush the directory + // move when we move the staged files into the log. + if err := safe.NewSyncer().SyncRecursive(transaction.walFilesPath()); err != nil { + return fmt.Errorf("sync recursive: %w", err) } - return destinationFile.Close() + transaction.packPrefix = packPrefix + + return nil }) return group.Wait() @@ -772,25 +813,25 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } nextLogIndex := mgr.appendedLogIndex + 1 - if transaction.includesPack { - logEntry.IncludesPack = true + if transaction.packPrefix != "" { + logEntry.PackPrefix = transaction.packPrefix - removePack, err := mgr.storePackFile(mgr.ctx, nextLogIndex, transaction) + removeFiles, err := mgr.storeWALFiles(mgr.ctx, nextLogIndex, transaction) cleanUps = append(cleanUps, func() error { - // The transaction's pack file might have been moved in to place at <repo>/wal/packs/<log_index>.pack. - // If anything fails before the transaction is committed, the pack file must be removed as otherwise - // it would occupy the pack file slot of the next log entry. If this can't be done, the TransactionManager - // will exit with an error. The pack file will be cleaned up on restart and no further processing is + // The transaction's files might have been moved successfully in to the log. + // If anything fails before the transaction is committed, the files must be removed as otherwise + // they would occupy the slot of the next log entry. If this can't be done, the TransactionManager + // will exit with an error. The files will be cleaned up on restart and no further processing is // allowed until that happens. if commitErr != nil { - return removePack() + return removeFiles() } return nil }) if err != nil { - return fmt.Errorf("store pack file: %w", err) + return fmt.Errorf("store wal files: %w", err) } } @@ -872,7 +913,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { mgr.applyNotifications[i] = make(chan struct{}) } - if err := mgr.removeStalePackFiles(mgr.ctx, mgr.appendedLogIndex); err != nil { + if err := mgr.removeStaleWALFiles(mgr.ctx, mgr.appendedLogIndex); err != nil { return fmt.Errorf("remove stale packs: %w", err) } @@ -990,14 +1031,14 @@ func (mgr *TransactionManager) createDirectories() error { return nil } -// removeStalePackFiles removes pack files from the log directory that have no associated log entry. -// Such packs can be left around if a transaction's pack file was moved in place successfully +// removeStaleWALFiles removes files from the log directory that have no associated log entry. +// Such files can be left around if transaction's files were moved in place successfully // but the manager was interrupted before successfully persisting the log entry itself. -func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error { +func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, appendedIndex LogIndex) error { // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale // pack would be for the next log index. Remove the pack if it exists. - possibleStalePackPath := packFilePathForLogIndex(mgr.repositoryPath, appendedIndex+1) - if err := os.Remove(possibleStalePackPath); err != nil { + possibleStaleFilesPath := walFilesPathForLogIndex(mgr.repositoryPath, appendedIndex+1) + if _, err := os.Stat(possibleStaleFilesPath); err != nil { if !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("remove: %w", err) } @@ -1005,31 +1046,35 @@ func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appende return nil } + if err := os.RemoveAll(possibleStaleFilesPath); err != nil { + return fmt.Errorf("remove all: %w", err) + } + // Sync the parent directory to flush the file deletion. - if err := safe.NewSyncer().Sync(filepath.Dir(possibleStalePackPath)); err != nil { + if err := safe.NewSyncer().SyncParent(possibleStaleFilesPath); err != nil { return fmt.Errorf("sync: %w", err) } return nil } -// storePackFile moves the transaction's pack file from the object directory to its destination in the log. -// It returns a function, even on errors, that must be called to clean up the pack file committing the log entry +// storeWALFiles moves the transaction's logged files from the staging directory to their destination in the log. +// It returns a function, even on errors, that must be called to clean up the files if committing the log entry // fails. -func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) { - removePack := func() error { return nil } +func (mgr *TransactionManager) storeWALFiles(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) { + removeFiles := func() error { return nil } - destinationPath := packFilePathForLogIndex(mgr.repositoryPath, index) + destinationPath := walFilesPathForLogIndex(mgr.repositoryPath, index) if err := os.Rename( - transaction.packFilePath(), + transaction.walFilesPath(), destinationPath, ); err != nil { - return removePack, fmt.Errorf("move pack file: %w", err) + return removeFiles, fmt.Errorf("move wal files: %w", err) } - removePack = func() error { + removeFiles = func() error { if err := os.Remove(destinationPath); err != nil { - return fmt.Errorf("remove pack file: %w", err) + return fmt.Errorf("remove wal files: %w", err) } return nil @@ -1037,16 +1082,20 @@ func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex // Sync the parent directory. The pack's contents are synced when the pack file is computed. if err := safe.NewSyncer().Sync(filepath.Dir(destinationPath)); err != nil { - return removePack, fmt.Errorf("sync: %w", err) + return removeFiles, fmt.Errorf("sync: %w", err) } - return removePack, nil + return removeFiles, nil } -// packFilePathForLogIndex returns a log entry's pack file's absolute path in a given -// a repository path. -func packFilePathForLogIndex(repoPath string, index LogIndex) string { - return filepath.Join(repoPath, "wal", "packs", index.String()+".pack") +// walFilesPathForLogIndex returns an absolute path to a given log entry's WAL files. +func walFilesPathForLogIndex(repoPath string, index LogIndex) string { + return filepath.Join(repoPath, "wal", "packs", index.String()) +} + +// packFilePath returns a log entry's pack file's absolute path in the wal files directory. +func packFilePath(walFiles string) string { + return filepath.Join(walFiles, "transaction.pack") } // verifyReferences verifies that the references in the transaction apply on top of the already accepted @@ -1288,8 +1337,8 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return fmt.Errorf("apply repository deletion: %w", err) } } else { - if logEntry.IncludesPack { - if err := mgr.applyPackFile(ctx, logIndex); err != nil { + if logEntry.PackPrefix != "" { + if err := mgr.applyPackFile(ctx, logEntry.PackPrefix, logIndex); err != nil { return fmt.Errorf("apply pack file: %w", err) } } @@ -1412,15 +1461,33 @@ func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, inde } // applyPackFile unpacks the objects from the pack file into the repository if the log entry -// has an associated pack file. -func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error { - packFile, err := os.Open(packFilePathForLogIndex(mgr.repositoryPath, logIndex)) - if err != nil { - return fmt.Errorf("open pack file: %w", err) +// has an associated pack file. This is done by hard linking the pack and index from the +// log into the repository's object directory. +func (mgr *TransactionManager) applyPackFile(ctx context.Context, packPrefix string, logIndex LogIndex) error { + packDirectory := filepath.Join(mgr.repositoryPath, "objects", "pack") + for _, fileName := range []string{ + packPrefix + ".pack", + packPrefix + ".idx", + } { + if err := os.Link( + filepath.Join(walFilesPathForLogIndex(mgr.repositoryPath, logIndex), fileName), + filepath.Join(packDirectory, fileName), + ); err != nil { + if !errors.Is(err, fs.ErrExist) { + return fmt.Errorf("link file: %w", err) + } + + // The file already existing means that we've already linked it in place or a repack + // has resulted in the exact same file. No need to do anything about it. + } } - defer packFile.Close() - return mgr.repository.UnpackObjects(ctx, packFile) + // Sync the new directory entries created. + if err := safe.NewSyncer().Sync(packDirectory); err != nil { + return fmt.Errorf("sync: %w", err) + } + + return nil } // applyCustomHooks applies the custom hooks to the repository from the log entry. The custom hooks are stored |