Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-08-16 15:38:24 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-08-18 15:42:53 +0300
commiteb41d6499db8add83e2541465522b338d90661f5 (patch)
treecd00da5c556b292b8e0ae3dcaf98ec922874dbe2
parent0c4699284cdf6a385bfd4383776b35bd1c3a8f0e (diff)
Store WAL state outside of the repository
The TransactionManager is currently storing WAL related state within the repository itself. This is problematic as if the repository itself is removed, so is the write-ahead log as well. Partitions must also support multiple repositories in the future so the WAL storage shouldn't be tied to any single repository. This commit moves the state out from the repository in to a separate directory in the storage. The state directory is derived by hashing the relative path of a repository. This is done to balance the state directories of partitions into multiple subdirectories similarly as we are doing with hashed storage. The relative path is used as the final component temporarily until we have partition IDs available. indexFileDirectoryEntry was updated to pass the object format directly as an option. Previously it was deriving the correct format from the repository but this is no longer convenient as the pack files the asserted pack files are no longer in the repository. Interestingly enough 'git verify-pack' had an '--object-format' option all along which is not documented on its man page.
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go34
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go67
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager_test.go17
3 files changed, 76 insertions, 42 deletions
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index 63a2765ee..204e8eaca 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -2,11 +2,14 @@ package storagemgr
import (
"context"
+ "crypto/sha256"
+ "encoding/hex"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
+ "strings"
"sync"
"github.com/dgraph-io/badger/v4"
@@ -227,6 +230,16 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
return nil, structerr.NewInvalidArgument("validate relative path: %w", err)
}
+ relativeStateDir := deriveStateDirectory(relativePath)
+ absoluteStateDir := filepath.Join(storageMgr.path, relativeStateDir)
+ if err := os.MkdirAll(filepath.Dir(absoluteStateDir), perm.PrivateDir); err != nil {
+ return nil, fmt.Errorf("create state directory hierarchy: %w", err)
+ }
+
+ if err := safe.NewSyncer().SyncHierarchy(storageMgr.path, filepath.Dir(relativeStateDir)); err != nil {
+ return nil, fmt.Errorf("sync state directory hierarchy: %w", err)
+ }
+
for {
storageMgr.mu.Lock()
if storageMgr.closed {
@@ -247,7 +260,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
return nil, fmt.Errorf("create staging directory: %w", err)
}
- mgr := NewTransactionManager(storageMgr.database, storageMgr.path, relativePath, stagingDir, pm.commandFactory, pm.housekeepingManager, storageMgr.repoFactory)
+ mgr := NewTransactionManager(storageMgr.database, storageMgr.path, relativePath, absoluteStateDir, stagingDir, pm.commandFactory, pm.housekeepingManager, storageMgr.repoFactory)
ptn.transactionManager = mgr
@@ -320,6 +333,25 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
}
}
+// deriveStateDirectory hashes the relative path and returns the state directory where a state
+// related to a given partition should be stored.
+func deriveStateDirectory(relativePath string) string {
+ hasher := sha256.New()
+ hasher.Write([]byte(relativePath))
+ hash := hex.EncodeToString(hasher.Sum(nil))
+
+ return filepath.Join(
+ "partitions",
+ // These two levels balance the state directories into smaller
+ // subdirectories to keep the directory sizes reasonable.
+ hash[0:2],
+ hash[2:4],
+ // Flatten the relative path by removing the path separators so the
+ // repository is stored on this level in the directory hierarchy.
+ strings.ReplaceAll(relativePath, string(os.PathSeparator), ""),
+ )
+}
+
// Close closes transaction processing for all storages and waits for closing completion.
func (pm *PartitionManager) Close() {
var activeStorages sync.WaitGroup
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 93c319a99..3fdba41e4 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -496,6 +496,9 @@ type TransactionManager struct {
// being admitted. This is differentiated from ctx.Done in order to enable testing that Run correctly
// releases awaiters when the transactions processing is stopped.
closed chan struct{}
+ // stateDirectory is an absolute path to a directory where the TransactionManager stores the state related to its
+ // write-ahead log.
+ stateDirectory string
// stagingDirectory is a path to a directory where this TransactionManager should stage the files of the transactions
// before it logs them. The TransactionManager cleans up the files during runtime but stale files may be
// left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when
@@ -556,6 +559,7 @@ func NewTransactionManager(
db *badger.DB,
storagePath,
relativePath,
+ stateDir,
stagingDir string,
cmdFactory git.CommandFactory,
housekeepingManager housekeeping.Manager,
@@ -577,6 +581,7 @@ func NewTransactionManager(
openTransactions: list.New(),
initialized: make(chan struct{}),
applyNotifications: make(map[LogIndex]chan struct{}),
+ stateDirectory: stateDir,
stagingDirectory: stagingDir,
housekeepingManager: housekeepingManager,
awaitingTransactions: make(map[LogIndex]resultChannel),
@@ -982,8 +987,8 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
}
if mgr.repositoryExists {
- if err := mgr.createDirectories(); err != nil {
- return fmt.Errorf("create directories: %w", err)
+ if err := mgr.createStateDirectory(); err != nil {
+ return fmt.Errorf("create state directory: %w", err)
}
}
@@ -1070,7 +1075,7 @@ func (mgr *TransactionManager) determineCustomHookIndex(ctx context.Context, app
}
}
- hookDirs, err := os.ReadDir(filepath.Join(mgr.repositoryPath, "wal", "hooks"))
+ hookDirs, err := os.ReadDir(filepath.Join(mgr.stateDirectory, "wal", "hooks"))
if err != nil {
return 0, fmt.Errorf("read hook directories: %w", err)
}
@@ -1090,28 +1095,28 @@ func (mgr *TransactionManager) determineCustomHookIndex(ctx context.Context, app
return hookIndex, err
}
-// createDirectories creates the directories that are expected to exist
-// in the repository for storing the state. Initializing them simplifies
-// rest of the code as it doesn't need handling for when they don't.
-func (mgr *TransactionManager) createDirectories() error {
- for _, relativePath := range []string{
- "wal/hooks",
- "wal/packs",
- } {
- directory := filepath.Join(mgr.repositoryPath, relativePath)
- if _, err := os.Stat(directory); err != nil {
- if !errors.Is(err, fs.ErrNotExist) {
- return fmt.Errorf("stat directory: %w", err)
- }
+func (mgr *TransactionManager) createStateDirectory() error {
+ if err := os.Mkdir(mgr.stateDirectory, perm.PrivateDir); err != nil {
+ if !errors.Is(err, fs.ErrExist) {
+ return fmt.Errorf("mkdir state directory: %w", err)
+ }
+ }
- if err := os.MkdirAll(directory, fs.ModePerm); err != nil {
- return fmt.Errorf("mkdir: %w", err)
- }
+ if err := os.MkdirAll(filepath.Join(mgr.stateDirectory, "wal", "packs"), fs.ModePerm); err != nil {
+ return fmt.Errorf("mkdir packs: %w", err)
+ }
- if err := safe.NewSyncer().SyncHierarchy(mgr.repositoryPath, relativePath); err != nil {
- return fmt.Errorf("sync: %w", err)
- }
- }
+ if err := os.MkdirAll(filepath.Join(mgr.stateDirectory, "wal", "hooks"), fs.ModePerm); err != nil {
+ return fmt.Errorf("mkdir hooks: %w", err)
+ }
+
+ syncer := safe.NewSyncer()
+ if err := syncer.SyncRecursive(mgr.stateDirectory); err != nil {
+ return fmt.Errorf("sync: %w", err)
+ }
+
+ if err := syncer.SyncParent(mgr.stateDirectory); err != nil {
+ return fmt.Errorf("sync parent: %w", err)
}
return nil
@@ -1123,7 +1128,7 @@ func (mgr *TransactionManager) createDirectories() error {
func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, appendedIndex LogIndex) error {
// Log entries are appended one by one to the log. If a write is interrupted, the only possible stale
// pack would be for the next log index. Remove the pack if it exists.
- possibleStaleFilesPath := walFilesPathForLogIndex(mgr.repositoryPath, appendedIndex+1)
+ possibleStaleFilesPath := walFilesPathForLogIndex(mgr.stateDirectory, appendedIndex+1)
if _, err := os.Stat(possibleStaleFilesPath); err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove: %w", err)
@@ -1150,7 +1155,7 @@ func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, appended
func (mgr *TransactionManager) storeWALFiles(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) {
removeFiles := func() error { return nil }
- destinationPath := walFilesPathForLogIndex(mgr.repositoryPath, index)
+ destinationPath := walFilesPathForLogIndex(mgr.stateDirectory, index)
if err := os.Rename(
transaction.walFilesPath(),
destinationPath,
@@ -1175,8 +1180,8 @@ func (mgr *TransactionManager) storeWALFiles(ctx context.Context, index LogIndex
}
// walFilesPathForLogIndex returns an absolute path to a given log entry's WAL files.
-func walFilesPathForLogIndex(repoPath string, index LogIndex) string {
- return filepath.Join(repoPath, "wal", "packs", index.String())
+func walFilesPathForLogIndex(stateDir string, index LogIndex) string {
+ return filepath.Join(stateDir, "wal", "packs", index.String())
}
// packFilePath returns a log entry's pack file's absolute path in the wal files directory.
@@ -1540,7 +1545,7 @@ func (mgr *TransactionManager) applyPackFile(ctx context.Context, packPrefix str
".rev",
} {
if err := os.Link(
- filepath.Join(walFilesPathForLogIndex(mgr.repositoryPath, logIndex), "objects"+fileExtension),
+ filepath.Join(walFilesPathForLogIndex(mgr.stateDirectory, logIndex), "objects"+fileExtension),
filepath.Join(packDirectory, packPrefix+fileExtension),
); err != nil {
if !errors.Is(err, fs.ErrExist) {
@@ -1572,7 +1577,7 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo
return nil
}
- targetDirectory := customHookPathForLogIndex(mgr.repositoryPath, logIndex)
+ targetDirectory := customHookPathForLogIndex(mgr.stateDirectory, logIndex)
if err := os.Mkdir(targetDirectory, fs.ModePerm); err != nil {
// The target directory may exist if we previously tried to extract the
// custom hooks there. TAR overwrites existing files and the custom hooks
@@ -1637,8 +1642,8 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo
// customHookPathForLogIndex returns the filesystem paths where the custom hooks
// for the given log index are stored.
-func customHookPathForLogIndex(repositoryPath string, logIndex LogIndex) string {
- return filepath.Join(repositoryPath, "wal", "hooks", logIndex.String())
+func customHookPathForLogIndex(stateDir string, logIndex LogIndex) string {
+ return filepath.Join(stateDir, "wal", "hooks", logIndex.String())
}
// deleteLogEntry deletes the log entry at the given index from the log.
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
index c109b3180..d33620e85 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
@@ -118,12 +118,7 @@ func indexFileDirectoryEntry(cfg config.Cfg) testhelper.DirectoryEntry {
tb.Helper()
// Verify the index is valid.
- //
- // -C filepath.Dir ensures the command runs in the tested repository, not in the developer's
- // Gitaly repository. Otherwise the hash algorithm in use would be derived from there which
- // would break tests.
- gittest.Exec(tb, cfg, "-C", filepath.Dir(path), "verify-pack", "-v", path)
-
+ gittest.Exec(tb, cfg, "verify-pack", "--object-format="+gittest.DefaultObjectHash.Format, "-v", path)
// As we already verified the index is valid, we don't care about the actual contents.
return nil
},
@@ -3603,6 +3598,8 @@ func TestTransactionManager(t *testing.T) {
require.NoError(t, err)
defer testhelper.MustClose(t, database)
+ stateDir := filepath.Join(setup.Config.Storages[0].Path, "state")
+
stagingDir := t.TempDir()
storagePath := setup.Config.Storages[0].Path
@@ -3613,7 +3610,7 @@ func TestTransactionManager(t *testing.T) {
// managerRunning tracks whether the manager is running or closed.
managerRunning bool
// transactionManager is the current TransactionManager instance.
- transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
+ transactionManager = NewTransactionManager(database, storagePath, relativePath, stateDir, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
// managerErr is used for synchronizing manager closing and returning
// the error from Run.
managerErr chan error
@@ -3660,7 +3657,7 @@ func TestTransactionManager(t *testing.T) {
require.NoError(t, os.RemoveAll(stagingDir))
require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir))
- transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
+ transactionManager = NewTransactionManager(database, storagePath, relativePath, stateDir, stagingDir, setup.CommandFactory, housekeepingManager, storageScopedFactory)
installHooks(t, transactionManager, database, hooks{
beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry,
beforeStoreLogEntry: step.Hooks.BeforeAppendLogEntry,
@@ -3870,7 +3867,7 @@ func TestTransactionManager(t *testing.T) {
}
}
- testhelper.RequireDirectoryState(t, repoPath, "wal", expectedDirectory)
+ testhelper.RequireDirectoryState(t, stateDir, "wal", expectedDirectory)
}
entries, err := os.ReadDir(stagingDir)
@@ -4043,7 +4040,7 @@ func BenchmarkTransactionManager(b *testing.B) {
commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents())
commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1))
- manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), cmdFactory, housekeepingManager, repositoryFactory)
+ manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), b.TempDir(), cmdFactory, housekeepingManager, repositoryFactory)
managers = append(managers, manager)