diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-04-12 15:30:10 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-04-12 15:30:10 +0300 |
commit | 483eed9d7a6fe945854940e6b724135d98340292 (patch) | |
tree | 2ceba73382c008a3f14b2e3f7f66fe22918bb22a | |
parent | e029e3b56eacf23943a987207e8b490f45598630 (diff) | |
parent | 107a5db4e696c799bb7109ff2525652249983a1c (diff) |
Merge branch 'smh-hook-index' into 'master'
Enable snapshot reads of hooks
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5584
Merged-by: Sami Hiltunen <shiltunen@gitlab.com>
Approved-by: James Fargher <proglottis@gmail.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Reviewed-by: James Fargher <proglottis@gmail.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r-- | internal/gitaly/transaction_manager.go | 106 | ||||
-rw-r--r-- | internal/gitaly/transaction_manager_test.go | 95 |
2 files changed, 188 insertions, 13 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index e5e0d6d87..8bac23ca5 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -109,6 +109,9 @@ type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate type Snapshot struct { // ReadIndex is the index of the log entry this Transaction is reading the data at. ReadIndex LogIndex + // HookIndex is index of the hooks on the disk that are included in this Transactions's snapshot + // and were the latest on the read index. + HookIndex LogIndex } // Transaction is a unit-of-work that contains reference changes to perform on the repository. @@ -143,8 +146,11 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) } mgr.mutex.RLock() - readIndex := mgr.appendedLogIndex - readReady := mgr.applyNotifications[readIndex] + snapshot := Snapshot{ + ReadIndex: mgr.appendedLogIndex, + HookIndex: mgr.hookIndex, + } + readReady := mgr.applyNotifications[snapshot.ReadIndex] mgr.mutex.RUnlock() if readReady == nil { // The snapshot log entry is already applied if there is no notification channel for it. @@ -160,10 +166,8 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) return nil, ErrTransactionProcessingStopped case <-readReady: return &Transaction{ - commit: mgr.commit, - snapshot: Snapshot{ - ReadIndex: readIndex, - }, + commit: mgr.commit, + snapshot: snapshot, }, nil } } @@ -287,6 +291,8 @@ type TransactionManager struct { appendedLogIndex LogIndex // appliedLogIndex holds the index of the last log entry applied to the repository appliedLogIndex LogIndex + // hookIndex stores the log index of the latest committed hooks in the repository. + hookIndex LogIndex } // repository is the localrepo interface used by TransactionManager. @@ -370,7 +376,7 @@ func (mgr *TransactionManager) Run() (returnedErr error) { defer close(mgr.runDone) defer mgr.Stop() - if err := mgr.initialize(); err != nil { + if err := mgr.initialize(mgr.ctx); err != nil { return fmt.Errorf("initialize: %w", err) } @@ -419,7 +425,7 @@ func (mgr *TransactionManager) Stop() { mgr.stop() } // initialize initializes the TransactionManager's state from the database. It loads the appendend and the applied // indexes and initializes the notification channels that synchronize transaction beginning with log entry applying. -func (mgr *TransactionManager) initialize() error { +func (mgr *TransactionManager) initialize(ctx context.Context) error { defer close(mgr.initialized) var appliedLogIndex gitalypb.LogIndex @@ -455,6 +461,12 @@ func (mgr *TransactionManager) initialize() error { return fmt.Errorf("determine appended log index: %w", err) } + var err error + mgr.hookIndex, err = mgr.determineHookIndex(ctx, mgr.appendedLogIndex, mgr.appliedLogIndex) + if err != nil { + return fmt.Errorf("determine hook index: %w", err) + } + // Each unapplied log entry should have a notification channel that gets closed when it is applied. // Create these channels here for the log entries. for i := mgr.appliedLogIndex + 1; i <= mgr.appendedLogIndex; i++ { @@ -464,6 +476,56 @@ func (mgr *TransactionManager) initialize() error { return nil } +// determineHookIndex determines the latest hooks in the repository. +// +// 1. First we iterate through the unapplied log in reverse order. The first log entry that +// contains custom hooks must have the latest hooks since it is the latest log entry. +// 2. If we don't find any custom hooks in the log, the latest hooks could have been applied +// to the repository already and the log entry pruned away. Look at the hooks on the disk +// to see which are the latest. +// 3. If we found no hooks in the log nor in the repository, there are no hooks configured. +func (mgr *TransactionManager) determineHookIndex(ctx context.Context, appendedIndex, appliedIndex LogIndex) (LogIndex, error) { + for i := appendedIndex; appliedIndex < i; i-- { + logEntry, err := mgr.readLogEntry(i) + if err != nil { + return 0, fmt.Errorf("read log entry: %w", err) + } + + if logEntry.CustomHooksUpdate != nil { + return i, nil + } + } + + repoPath, err := mgr.repository.Path() + if err != nil { + return 0, fmt.Errorf("repository path: %w", err) + } + + hookDirs, err := os.ReadDir(filepath.Join(repoPath, "wal", "hooks")) + if err != nil { + // If the directory doesn't exist, then there are no hooks yet. + if !errors.Is(err, fs.ErrNotExist) { + return 0, fmt.Errorf("read hook directories: %w", err) + } + + return 0, nil + } + + var hookIndex LogIndex + for _, dir := range hookDirs { + rawIndex, err := strconv.ParseUint(dir.Name(), 10, 64) + if err != nil { + return 0, fmt.Errorf("parse hook index: %w", err) + } + + if index := LogIndex(rawIndex); hookIndex < index { + hookIndex = index + } + } + + return hookIndex, err +} + // verifyReferences verifies that the references in the transaction apply on top of the already accepted // reference changes. The old tips in the transaction are verified against the current actual tips. // It returns the write-ahead log entry for the transaction if it was successfully verified. @@ -629,6 +691,9 @@ func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error mgr.mutex.Lock() mgr.appendedLogIndex = nextLogIndex + if logEntry.CustomHooksUpdate != nil { + mgr.hookIndex = nextLogIndex + } mgr.applyNotifications[nextLogIndex] = make(chan struct{}) mgr.mutex.Unlock() @@ -637,10 +702,8 @@ func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error // applyLogEntry reads a log entry at the given index and applies it to the repository. func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error { - var logEntry gitalypb.LogEntry - key := keyLogEntry(getRepositoryID(mgr.repository), logIndex) - - if err := mgr.readKey(key, &logEntry); err != nil { + logEntry, err := mgr.readLogEntry(logIndex) + if err != nil { return fmt.Errorf("read log entry: %w", err) } @@ -665,7 +728,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return fmt.Errorf("set applied log index: %w", err) } - if err := mgr.deleteKey(key); err != nil { + if err := mgr.deleteLogEntry(logIndex); err != nil { return fmt.Errorf("deleting log entry: %w", err) } @@ -744,6 +807,23 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo return nil } +// deleteLogEntry deletes the log entry at the given index from the log. +func (mgr *TransactionManager) deleteLogEntry(index LogIndex) error { + return mgr.deleteKey(keyLogEntry(getRepositoryID(mgr.repository), index)) +} + +// readLogEntry returns the log entry from the given position in the log. +func (mgr *TransactionManager) readLogEntry(index LogIndex) (*gitalypb.LogEntry, error) { + var logEntry gitalypb.LogEntry + key := keyLogEntry(getRepositoryID(mgr.repository), index) + + if err := mgr.readKey(key, &logEntry); err != nil { + return nil, fmt.Errorf("read key: %w", err) + } + + return &logEntry, nil +} + // storeLogEntry stores the log entry in the repository's write-ahead log at the given index. func (mgr *TransactionManager) storeLogEntry(index LogIndex, entry *gitalypb.LogEntry) error { return mgr.setKey(keyLogEntry(getRepositoryID(mgr.repository), index), entry) diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index e1ec64c7e..8b6a97fb7 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -911,6 +911,7 @@ func TestTransactionManager(t *testing.T) { TransactionID: 2, ExpectedSnapshot: Snapshot{ ReadIndex: 1, + HookIndex: 1, }, }, Commit{ @@ -977,6 +978,75 @@ func TestTransactionManager(t *testing.T) { }, }, { + desc: "hook index is correctly determined from log and disk", + steps: steps{ + StartManager{ + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{}, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + HookIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + CustomHooksUpdate: &CustomHooksUpdate{}, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 2, + HookIndex: 2, + }, + }, + StopManager{}, + StartManager{}, + Begin{ + TransactionID: 4, + ExpectedSnapshot: Snapshot{ + ReadIndex: 2, + HookIndex: 2, + }, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(), + }, + Hooks: testhelper.DirectoryState{ + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1/pre-receive": { + Mode: umask.Mask(fs.ModePerm), + Content: []byte("hook content"), + }, + "/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + "/wal/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + }, + }, + }, + { desc: "continues processing after reference verification failure", steps: steps{ StartManager{}, @@ -1591,11 +1661,15 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID}, }, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, }, Begin{ TransactionID: 3, ExpectedSnapshot: Snapshot{ ReadIndex: 1, + HookIndex: 1, }, }, Commit{ @@ -1608,6 +1682,7 @@ func TestTransactionManager(t *testing.T) { TransactionID: 4, ExpectedSnapshot: Snapshot{ ReadIndex: 2, + HookIndex: 1, }, }, Rollback{ @@ -1617,6 +1692,7 @@ func TestTransactionManager(t *testing.T) { TransactionID: 5, ExpectedSnapshot: Snapshot{ ReadIndex: 2, + HookIndex: 1, }, }, Commit{ @@ -1624,6 +1700,14 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID}, }, + CustomHooksUpdate: &CustomHooksUpdate{}, + }, + Begin{ + TransactionID: 6, + ExpectedSnapshot: Snapshot{ + ReadIndex: 3, + HookIndex: 3, + }, }, }, expectedState: StateAssertion{ @@ -1634,6 +1718,17 @@ func TestTransactionManager(t *testing.T) { Database: DatabaseState{ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(3).toProto(), }, + Hooks: testhelper.DirectoryState{ + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1/pre-receive": { + Mode: umask.Mask(fs.ModePerm), + Content: []byte("hook content"), + }, + "/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + "/wal/hooks/3": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + }, }, }, } |