diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-04-16 14:25:54 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-05-10 17:38:56 +0300 |
commit | 05771b584a7631f28d573fb97618563ffa485918 (patch) | |
tree | e6a20afeb011d7be0db277eea3379ea2c9212d29 /internal/gitaly/transaction_manager.go | |
parent | ce86f569d5552be144e9109a17d92d25567df62a (diff) |
Pass storage and relative path directly to TransactionManager
TransactionManager is currently retrieving the repository path through
the localrepo.Repo. This won't work in the future anymore once we WAL
repository creations and deletions. Repo.Path() checks whether the
directory is a git repository, which won't be the case if the repository
was deleted or hasn't been created yet. We need the path when initializing
the manager so we can load its state correctly. We're taking the storage path
and relative path instead of a GitRepo as it works better in the context
of the TransactionManager:
1. The TransactionManager always works within a single storage, given it
operates on a single repository. The typical interface of using a Locator
to find the path of a repository is thus needlessly complex, and would force
error handling to check a non-existent storage is not accessed. Taking the path
directly doesn't require unnecessary error handling here, and forces the caller
to handle it.
2. For most cases, just taking the repository's absolute path would be enough
However, we also need an identifier to use in the database keys. Currently
the identifier is <storage>:<relative_path>. Each storage will be an independent
failure domain and will have their own database instance. Given that, a
relative path is enough to uniquely identify a repository within a storage.
We also don't have a better identifier available yet so we use a relative path.
After the RenameRepository is removed, the relative path will be stable and okay
to use as an identifier.
The PartitionManager is extended to take in the configured storages so it can pass
the required parameters when constructing the TransactionManagers.
Diffstat (limited to 'internal/gitaly/transaction_manager.go')
-rw-r--r-- | internal/gitaly/transaction_manager.go | 63 |
1 files changed, 15 insertions, 48 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 50ee7d819..8a5ffb34d 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -370,6 +370,10 @@ type TransactionManager struct { // repository is the repository this TransactionManager is acting on. repository repository + // repositoryPath is the path to the repository this TransactionManager is acting on. + repositoryPath string + // relativePath is the repository's relative path inside the storage. + relativePath string // db is the handle to the key-value store used for storing the write-ahead log related state. db database // admissionQueue is where the incoming writes are waiting to be admitted to the transaction @@ -401,7 +405,6 @@ type repository interface { git.RepositoryExecutor ResolveRevision(context.Context, git.Revision) (git.ObjectID, error) SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error - Path() (string, error) UnpackObjects(context.Context, io.Reader) error Quarantine(string) (*localrepo.Repo, error) WalkUnreachableObjects(context.Context, io.Reader, io.Writer) error @@ -409,7 +412,7 @@ type repository interface { } // NewTransactionManager returns a new TransactionManager for the given repository. -func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, stagingDir string, transactionFinalizer func()) *TransactionManager { +func NewTransactionManager(db *badger.DB, storagePath, relativePath string, repository *localrepo.Repo, stagingDir string, transactionFinalizer func()) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) return &TransactionManager{ ctx: ctx, @@ -417,6 +420,8 @@ func NewTransactionManager(db *badger.DB, repository *localrepo.Repo, stagingDir runDone: make(chan struct{}), stop: cancel, repository: repository, + repositoryPath: filepath.Join(storagePath, relativePath), + relativePath: relativePath, db: newDatabaseAdapter(db), admissionQueue: make(chan *Transaction), initialized: make(chan struct{}), @@ -742,12 +747,7 @@ func (mgr *TransactionManager) determineHookIndex(ctx context.Context, appendedI } } - repoPath, err := mgr.repository.Path() - if err != nil { - return 0, fmt.Errorf("repository path: %w", err) - } - - hookDirs, err := os.ReadDir(filepath.Join(repoPath, "wal", "hooks")) + hookDirs, err := os.ReadDir(filepath.Join(mgr.repositoryPath, "wal", "hooks")) if err != nil { return 0, fmt.Errorf("read hook directories: %w", err) } @@ -771,16 +771,11 @@ func (mgr *TransactionManager) determineHookIndex(ctx context.Context, appendedI // in the repository for storing the state. Initializing them simplifies // rest of the code as it doesn't need handling for when they don't. func (mgr *TransactionManager) createDirectories() error { - repoPath, err := mgr.repository.Path() - if err != nil { - return fmt.Errorf("repo path: %w", err) - } - for _, relativePath := range []string{ "wal/hooks", "wal/packs", } { - directory := filepath.Join(repoPath, relativePath) + directory := filepath.Join(mgr.repositoryPath, relativePath) if _, err := os.Stat(directory); err != nil { if !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("stat directory: %w", err) @@ -790,7 +785,7 @@ func (mgr *TransactionManager) createDirectories() error { return fmt.Errorf("mkdir: %w", err) } - if err := safe.NewSyncer().SyncHierarchy(repoPath, relativePath); err != nil { + if err := safe.NewSyncer().SyncHierarchy(mgr.repositoryPath, relativePath); err != nil { return fmt.Errorf("sync: %w", err) } } @@ -805,11 +800,7 @@ func (mgr *TransactionManager) createDirectories() error { func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appendedIndex LogIndex) error { // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale // pack would be for the next log index. Remove the pack if it exists. - possibleStalePackPath, err := mgr.packFilePath(appendedIndex + 1) - if err != nil { - return fmt.Errorf("pack file path: %w", err) - } - + possibleStalePackPath := packFilePathForLogIndex(mgr.repositoryPath, appendedIndex+1) if err := os.Remove(possibleStalePackPath); err != nil { if !errors.Is(err, fs.ErrNotExist) { return fmt.Errorf("remove: %w", err) @@ -832,11 +823,7 @@ func (mgr *TransactionManager) removeStalePackFiles(ctx context.Context, appende func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) { removePack := func() error { return nil } - destinationPath, err := mgr.packFilePath(index) - if err != nil { - return removePack, fmt.Errorf("pack file path: %w", err) - } - + destinationPath := packFilePathForLogIndex(mgr.repositoryPath, index) if err := os.Rename( transaction.packFilePath(), destinationPath, @@ -860,16 +847,6 @@ func (mgr *TransactionManager) storePackFile(ctx context.Context, index LogIndex return removePack, nil } -// packFilePath returns the path where a given log entry's pack file would be stored. -func (mgr *TransactionManager) packFilePath(index LogIndex) (string, error) { - repoPath, err := mgr.repository.Path() - if err != nil { - return "", fmt.Errorf("repo path: %w", err) - } - - return packFilePathForLogIndex(repoPath, index), nil -} - // packFilePathForLogIndex returns a log entry's pack file's absolute path in a given // a repository path. func packFilePathForLogIndex(repoPath string, index LogIndex) string { @@ -1115,12 +1092,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn // applyPackFile unpacks the objects from the pack file into the repository if the log entry // has an associated pack file. func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error { - packFilePath, err := mgr.packFilePath(logIndex) - if err != nil { - return fmt.Errorf("pack file path: %w", err) - } - - packFile, err := os.Open(packFilePath) + packFile, err := os.Open(packFilePathForLogIndex(mgr.repositoryPath, logIndex)) if err != nil { return fmt.Errorf("open pack file: %w", err) } @@ -1137,15 +1109,10 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo return nil } - repoPath, err := mgr.repository.Path() - if err != nil { - return fmt.Errorf("repository path: %w", err) - } - syncer := safe.NewSyncer() hooksPath := filepath.Join("wal", "hooks") - targetDirectory := filepath.Join(repoPath, hooksPath, logIndex.String()) + targetDirectory := filepath.Join(mgr.repositoryPath, hooksPath, logIndex.String()) if err := os.Mkdir(targetDirectory, fs.ModePerm); err != nil { // The target directory may exist if we previously tried to extract the // hooks there. TAR overwrites existing files and the hooks files are @@ -1165,7 +1132,7 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo } // Sync the parent directory as well. - if err := syncer.Sync(filepath.Join(repoPath, hooksPath)); err != nil { + if err := syncer.Sync(filepath.Join(mgr.repositoryPath, hooksPath)); err != nil { return fmt.Errorf("sync hook directory: %w", err) } |