Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-06-02 12:25:18 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-06-02 12:25:18 +0300
commit27db5949d60beb5170467da8188f1599ff4760c1 (patch)
tree7476a137ad41a80d421dd2b7b3f908639fb1a15d /internal/gitaly/transaction_manager.go
parent81496efc0d26dba7799d1392c80b06bce943cc29 (diff)
parent3cee3b150adc7935d611dd98dfca4b2c2149c0f7 (diff)
Merge branch 'smh-move-packs-in-place' into 'master'
Apply logged pack files to repository without copying Closes #5046 See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5846 Merged-by: Sami Hiltunen <shiltunen@gitlab.com> Approved-by: karthik nayak <knayak@gitlab.com> Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: karthik nayak <knayak@gitlab.com>
Diffstat (limited to 'internal/gitaly/transaction_manager.go')
-rw-r--r--internal/gitaly/transaction_manager.go195
1 files changed, 131 insertions, 64 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go
index ab493dd34..2bb757e30 100644
--- a/internal/gitaly/transaction_manager.go
+++ b/internal/gitaly/transaction_manager.go
@@ -11,6 +11,7 @@ import (
"io/fs"
"os"
"path/filepath"
+ "regexp"
"sort"
"strconv"
"strings"
@@ -153,9 +154,9 @@ type Transaction struct {
// quarantineDirectory is the directory within the stagingDirectory where the new objects of the
// transaction are quarantined.
quarantineDirectory string
- // includesPack is set if a pack file has been computed for the transaction and should be
- // logged.
- includesPack bool
+ // packPrefix contains the prefix (`pack-<digest>`) of the transaction's pack if the transaction
+ // had objects to log.
+ packPrefix string
// stagingRepository is a repository that is used to stage the transaction. If there are quarantined
// objects, it has the quarantine applied so the objects are available for verification and packing.
stagingRepository *localrepo.Repo
@@ -358,9 +359,10 @@ func (txn *Transaction) SetCustomHooks(customHooksTAR []byte) {
txn.customHooksUpdate = &CustomHooksUpdate{CustomHooksTAR: customHooksTAR}
}
-// packFilePath returns the path to this transaction's pack file.
-func (txn *Transaction) packFilePath() string {
- return filepath.Join(txn.stagingDirectory, "transaction.pack")
+// walFilesPath returns the path to the directory where this transaction is staging the files that will
+// be logged alongside the transaction's log entry.
+func (txn *Transaction) walFilesPath() string {
+ return filepath.Join(txn.stagingDirectory, "wal-files")
}
// TransactionManager is responsible for transaction management of a single repository. Each repository has
@@ -587,6 +589,10 @@ func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, trans
return nil
}
+// packPrefixRegexp matches the output of `git index-pack` where it
+// prints the packs prefix in the format `pack <digest>`.
+var packPrefixRegexp = regexp.MustCompile(`^pack\t([0-9a-f]+)\n$`)
+
// packObjects packs the objects included in the transaction into a single pack file that is ready
// for logging. The pack file includes all unreachable objects that are about to be made reachable.
func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Transaction) error {
@@ -614,8 +620,6 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra
return nil
}
- transaction.includesPack = true
-
objectsReader, objectsWriter := io.Pipe()
group, ctx := errgroup.WithContext(ctx)
@@ -632,30 +636,67 @@ func (mgr *TransactionManager) packObjects(ctx context.Context, transaction *Tra
return nil
})
+ packReader, packWriter := io.Pipe()
group.Go(func() (returnedErr error) {
- defer func() { objectsReader.CloseWithError(returnedErr) }()
+ defer func() {
+ objectsReader.CloseWithError(returnedErr)
+ packWriter.CloseWithError(returnedErr)
+ }()
- destinationFile, err := os.OpenFile(
- transaction.packFilePath(),
- os.O_WRONLY|os.O_CREATE|os.O_EXCL,
- perm.PrivateFile,
- )
- if err != nil {
- return fmt.Errorf("open file: %w", err)
+ if err := transaction.stagingRepository.PackObjects(ctx, objectsReader, packWriter); err != nil {
+ return fmt.Errorf("pack objects: %w", err)
}
- defer destinationFile.Close()
- if err := transaction.stagingRepository.PackObjects(ctx, objectsReader, destinationFile); err != nil {
- return fmt.Errorf("pack objects: %w", err)
+ return nil
+ })
+
+ group.Go(func() (returnedErr error) {
+ defer packReader.CloseWithError(returnedErr)
+
+ if err := os.Mkdir(transaction.walFilesPath(), perm.PrivateDir); err != nil {
+ return fmt.Errorf("create wal files directory: %w", err)
+ }
+
+ // index-pack places the pack and the index into the repository's object directory. The
+ // staging repository is configured with a quarantine so we execute it there.
+ var stdout, stderr bytes.Buffer
+ if err := transaction.stagingRepository.ExecAndWait(ctx, git.Command{
+ Name: "index-pack",
+ Flags: []git.Option{git.Flag{Name: "--stdin"}},
+ }, git.WithStdin(packReader), git.WithStdout(&stdout), git.WithStderr(&stderr)); err != nil {
+ return structerr.New("index pack: %w", err).WithMetadata("stderr", stderr.String())
+ }
+
+ matches := packPrefixRegexp.FindStringSubmatch(stdout.String())
+ if len(matches) != 2 {
+ return structerr.New("unexpected index-pack output").WithMetadata("stdout", stdout.String())
+ }
+
+ // Move the files from the quarantine to the wal-files directory so they'll get logged as part
+ // of the directory.
+ packPrefix := fmt.Sprintf("pack-%s", matches[1])
+ for _, fileName := range []string{
+ packPrefix + ".pack",
+ packPrefix + ".idx",
+ } {
+ if err := os.Rename(
+ filepath.Join(transaction.quarantineDirectory, "pack", fileName),
+ filepath.Join(transaction.walFilesPath(), fileName),
+ ); err != nil {
+ return fmt.Errorf("move file: %w", err)
+ }
}
- // Sync the contents of the pack so they are flushed to disk prior to the transaction
- // being admitted for processing.
- if err := destinationFile.Sync(); err != nil {
- return fmt.Errorf("sync pack: %w", err)
+ // Sync the files and the directory entries so everything is flushed to the disk prior
+ // to moving on to committing the log entry. This way we only have to flush the directory
+ // move when we move the staged files into the log.
+ if err := safe.NewSyncer().SyncRecursive(transaction.walFilesPath()); err != nil {
+ return fmt.Errorf("sync recursive: %w", err)
}
- return destinationFile.Close()
+ transaction.packPrefix = packPrefix
+
+ return nil
})
return group.Wait()
@@ -772,25 +813,25 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
}
nextLogIndex := mgr.appendedLogIndex + 1
- if transaction.includesPack {
- logEntry.IncludesPack = true
+ if transaction.packPrefix != "" {
+ logEntry.PackPrefix = transaction.packPrefix
- removePack, err := mgr.storePackFile(mgr.ctx, nextLogIndex, transaction)
+ removeFiles, err := mgr.storeWALFiles(mgr.ctx, nextLogIndex, transaction)
cleanUps = append(cleanUps, func() error {
- // The transaction's pack file might have been moved in to place at <repo>/wal/packs/<log_index>.pack.
- // If anything fails before the transaction is committed, the pack file must be removed as otherwise
- // it would occupy the pack file slot of the next log entry. If this can't be done, the TransactionManager
- // will exit with an error. The pack file will be cleaned up on restart and no further processing is
+ // The transaction's files might have been moved successfully in to the log.
+ // If anything fails before the transaction is committed, the files must be removed as otherwise
+ // they would occupy the slot of the next log entry. If this can't be done, the TransactionManager
+ // will exit with an error. The files will be cleaned up on restart and no further processing is
// allowed until that happens.
if commitErr != nil {
- return removePack()
+ return removeFiles()
}
return nil
})
if err != nil {
- return fmt.Errorf("store pack file: %w", err)
+ return fmt.Errorf("store wal files: %w", err)
}
}
@@ -872,7 +913,7 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
mgr.applyNotifications[i] = make(chan struct{})
}
- if err := mgr.removeStalePackFiles(mgr.ctx, mgr.appendedLogIndex); err != nil {
+ if err := mgr.removeStaleWALFiles(mgr.ctx, mgr.appendedLogIndex); err != nil {
return fmt.Errorf("remove stale packs: %w", err)
}
@@ -990,14 +1031,14 @@ func (mgr *TransactionManager) createDirectories() error {
return nil
}
-// removeStalePackFiles removes pack files from the log directory that have no associated log entry.
-// Such packs can be left around if a transaction's pack file was moved in place successfully
+// removeStaleWALFiles removes files from the log directory that have no associated log entry.
+// Such files can be left around if transaction's files were moved in place successfully
// but the manager was interrupted before successfully persisting the log entry itself.
-func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error {
+func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, appendedIndex LogIndex) error {
// Log entries are appended one by one to the log. If a write is interrupted, the only possible stale
// pack would be for the next log index. Remove the pack if it exists.
- possibleStalePackPath := packFilePathForLogIndex(mgr.repositoryPath, appendedIndex+1)
- if err := os.Remove(possibleStalePackPath); err != nil {
+ possibleStaleFilesPath := walFilesPathForLogIndex(mgr.repositoryPath, appendedIndex+1)
+ if _, err := os.Stat(possibleStaleFilesPath); err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove: %w", err)
}
@@ -1005,31 +1046,35 @@ func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appende
return nil
}
+ if err := os.RemoveAll(possibleStaleFilesPath); err != nil {
+ return fmt.Errorf("remove all: %w", err)
+ }
+
// Sync the parent directory to flush the file deletion.
- if err := safe.NewSyncer().Sync(filepath.Dir(possibleStalePackPath)); err != nil {
+ if err := safe.NewSyncer().SyncParent(possibleStaleFilesPath); err != nil {
return fmt.Errorf("sync: %w", err)
}
return nil
}
-// storePackFile moves the transaction's pack file from the object directory to its destination in the log.
-// It returns a function, even on errors, that must be called to clean up the pack file committing the log entry
+// storeWALFiles moves the transaction's logged files from the staging directory to their destination in the log.
+// It returns a function, even on errors, that must be called to clean up the files if committing the log entry
// fails.
-func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) {
- removePack := func() error { return nil }
+func (mgr *TransactionManager) storeWALFiles(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) {
+ removeFiles := func() error { return nil }
- destinationPath := packFilePathForLogIndex(mgr.repositoryPath, index)
+ destinationPath := walFilesPathForLogIndex(mgr.repositoryPath, index)
if err := os.Rename(
- transaction.packFilePath(),
+ transaction.walFilesPath(),
destinationPath,
); err != nil {
- return removePack, fmt.Errorf("move pack file: %w", err)
+ return removeFiles, fmt.Errorf("move wal files: %w", err)
}
- removePack = func() error {
+ removeFiles = func() error {
if err := os.Remove(destinationPath); err != nil {
- return fmt.Errorf("remove pack file: %w", err)
+ return fmt.Errorf("remove wal files: %w", err)
}
return nil
@@ -1037,16 +1082,20 @@ func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex
// Sync the parent directory. The pack's contents are synced when the pack file is computed.
if err := safe.NewSyncer().Sync(filepath.Dir(destinationPath)); err != nil {
- return removePack, fmt.Errorf("sync: %w", err)
+ return removeFiles, fmt.Errorf("sync: %w", err)
}
- return removePack, nil
+ return removeFiles, nil
}
-// packFilePathForLogIndex returns a log entry's pack file's absolute path in a given
-// a repository path.
-func packFilePathForLogIndex(repoPath string, index LogIndex) string {
- return filepath.Join(repoPath, "wal", "packs", index.String()+".pack")
+// walFilesPathForLogIndex returns an absolute path to a given log entry's WAL files.
+func walFilesPathForLogIndex(repoPath string, index LogIndex) string {
+ return filepath.Join(repoPath, "wal", "packs", index.String())
+}
+
+// packFilePath returns a log entry's pack file's absolute path in the wal files directory.
+func packFilePath(walFiles string) string {
+ return filepath.Join(walFiles, "transaction.pack")
}
// verifyReferences verifies that the references in the transaction apply on top of the already accepted
@@ -1288,8 +1337,8 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return fmt.Errorf("apply repository deletion: %w", err)
}
} else {
- if logEntry.IncludesPack {
- if err := mgr.applyPackFile(ctx, logIndex); err != nil {
+ if logEntry.PackPrefix != "" {
+ if err := mgr.applyPackFile(ctx, logEntry.PackPrefix, logIndex); err != nil {
return fmt.Errorf("apply pack file: %w", err)
}
}
@@ -1412,15 +1461,33 @@ func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, inde
}
// applyPackFile unpacks the objects from the pack file into the repository if the log entry
-// has an associated pack file.
-func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error {
- packFile, err := os.Open(packFilePathForLogIndex(mgr.repositoryPath, logIndex))
- if err != nil {
- return fmt.Errorf("open pack file: %w", err)
+// has an associated pack file. This is done by hard linking the pack and index from the
+// log into the repository's object directory.
+func (mgr *TransactionManager) applyPackFile(ctx context.Context, packPrefix string, logIndex LogIndex) error {
+ packDirectory := filepath.Join(mgr.repositoryPath, "objects", "pack")
+ for _, fileName := range []string{
+ packPrefix + ".pack",
+ packPrefix + ".idx",
+ } {
+ if err := os.Link(
+ filepath.Join(walFilesPathForLogIndex(mgr.repositoryPath, logIndex), fileName),
+ filepath.Join(packDirectory, fileName),
+ ); err != nil {
+ if !errors.Is(err, fs.ErrExist) {
+ return fmt.Errorf("link file: %w", err)
+ }
+
+ // The file already existing means that we've already linked it in place or a repack
+ // has resulted in the exact same file. No need to do anything about it.
+ }
}
- defer packFile.Close()
- return mgr.repository.UnpackObjects(ctx, packFile)
+ // Sync the new directory entries created.
+ if err := safe.NewSyncer().Sync(packDirectory); err != nil {
+ return fmt.Errorf("sync: %w", err)
+ }
+
+ return nil
}
// applyCustomHooks applies the custom hooks to the repository from the log entry. The custom hooks are stored