Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-04-12 15:30:10 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-04-12 15:30:10 +0300
commit483eed9d7a6fe945854940e6b724135d98340292 (patch)
tree2ceba73382c008a3f14b2e3f7f66fe22918bb22a
parente029e3b56eacf23943a987207e8b490f45598630 (diff)
parent107a5db4e696c799bb7109ff2525652249983a1c (diff)
Merge branch 'smh-hook-index' into 'master'
Enable snapshot reads of hooks See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5584 Merged-by: Sami Hiltunen <shiltunen@gitlab.com> Approved-by: James Fargher <proglottis@gmail.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Reviewed-by: James Fargher <proglottis@gmail.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--internal/gitaly/transaction_manager.go106
-rw-r--r--internal/gitaly/transaction_manager_test.go95
2 files changed, 188 insertions, 13 deletions
diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go
index e5e0d6d87..8bac23ca5 100644
--- a/internal/gitaly/transaction_manager.go
+++ b/internal/gitaly/transaction_manager.go
@@ -109,6 +109,9 @@ type ReferenceUpdates map[git.ReferenceName]ReferenceUpdate
type Snapshot struct {
// ReadIndex is the index of the log entry this Transaction is reading the data at.
ReadIndex LogIndex
+ // HookIndex is index of the hooks on the disk that are included in this Transactions's snapshot
+ // and were the latest on the read index.
+ HookIndex LogIndex
}
// Transaction is a unit-of-work that contains reference changes to perform on the repository.
@@ -143,8 +146,11 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error)
}
mgr.mutex.RLock()
- readIndex := mgr.appendedLogIndex
- readReady := mgr.applyNotifications[readIndex]
+ snapshot := Snapshot{
+ ReadIndex: mgr.appendedLogIndex,
+ HookIndex: mgr.hookIndex,
+ }
+ readReady := mgr.applyNotifications[snapshot.ReadIndex]
mgr.mutex.RUnlock()
if readReady == nil {
// The snapshot log entry is already applied if there is no notification channel for it.
@@ -160,10 +166,8 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error)
return nil, ErrTransactionProcessingStopped
case <-readReady:
return &Transaction{
- commit: mgr.commit,
- snapshot: Snapshot{
- ReadIndex: readIndex,
- },
+ commit: mgr.commit,
+ snapshot: snapshot,
}, nil
}
}
@@ -287,6 +291,8 @@ type TransactionManager struct {
appendedLogIndex LogIndex
// appliedLogIndex holds the index of the last log entry applied to the repository
appliedLogIndex LogIndex
+ // hookIndex stores the log index of the latest committed hooks in the repository.
+ hookIndex LogIndex
}
// repository is the localrepo interface used by TransactionManager.
@@ -370,7 +376,7 @@ func (mgr *TransactionManager) Run() (returnedErr error) {
defer close(mgr.runDone)
defer mgr.Stop()
- if err := mgr.initialize(); err != nil {
+ if err := mgr.initialize(mgr.ctx); err != nil {
return fmt.Errorf("initialize: %w", err)
}
@@ -419,7 +425,7 @@ func (mgr *TransactionManager) Stop() { mgr.stop() }
// initialize initializes the TransactionManager's state from the database. It loads the appendend and the applied
// indexes and initializes the notification channels that synchronize transaction beginning with log entry applying.
-func (mgr *TransactionManager) initialize() error {
+func (mgr *TransactionManager) initialize(ctx context.Context) error {
defer close(mgr.initialized)
var appliedLogIndex gitalypb.LogIndex
@@ -455,6 +461,12 @@ func (mgr *TransactionManager) initialize() error {
return fmt.Errorf("determine appended log index: %w", err)
}
+ var err error
+ mgr.hookIndex, err = mgr.determineHookIndex(ctx, mgr.appendedLogIndex, mgr.appliedLogIndex)
+ if err != nil {
+ return fmt.Errorf("determine hook index: %w", err)
+ }
+
// Each unapplied log entry should have a notification channel that gets closed when it is applied.
// Create these channels here for the log entries.
for i := mgr.appliedLogIndex + 1; i <= mgr.appendedLogIndex; i++ {
@@ -464,6 +476,56 @@ func (mgr *TransactionManager) initialize() error {
return nil
}
+// determineHookIndex determines the latest hooks in the repository.
+//
+// 1. First we iterate through the unapplied log in reverse order. The first log entry that
+// contains custom hooks must have the latest hooks since it is the latest log entry.
+// 2. If we don't find any custom hooks in the log, the latest hooks could have been applied
+// to the repository already and the log entry pruned away. Look at the hooks on the disk
+// to see which are the latest.
+// 3. If we found no hooks in the log nor in the repository, there are no hooks configured.
+func (mgr *TransactionManager) determineHookIndex(ctx context.Context, appendedIndex, appliedIndex LogIndex) (LogIndex, error) {
+ for i := appendedIndex; appliedIndex < i; i-- {
+ logEntry, err := mgr.readLogEntry(i)
+ if err != nil {
+ return 0, fmt.Errorf("read log entry: %w", err)
+ }
+
+ if logEntry.CustomHooksUpdate != nil {
+ return i, nil
+ }
+ }
+
+ repoPath, err := mgr.repository.Path()
+ if err != nil {
+ return 0, fmt.Errorf("repository path: %w", err)
+ }
+
+ hookDirs, err := os.ReadDir(filepath.Join(repoPath, "wal", "hooks"))
+ if err != nil {
+ // If the directory doesn't exist, then there are no hooks yet.
+ if !errors.Is(err, fs.ErrNotExist) {
+ return 0, fmt.Errorf("read hook directories: %w", err)
+ }
+
+ return 0, nil
+ }
+
+ var hookIndex LogIndex
+ for _, dir := range hookDirs {
+ rawIndex, err := strconv.ParseUint(dir.Name(), 10, 64)
+ if err != nil {
+ return 0, fmt.Errorf("parse hook index: %w", err)
+ }
+
+ if index := LogIndex(rawIndex); hookIndex < index {
+ hookIndex = index
+ }
+ }
+
+ return hookIndex, err
+}
+
// verifyReferences verifies that the references in the transaction apply on top of the already accepted
// reference changes. The old tips in the transaction are verified against the current actual tips.
// It returns the write-ahead log entry for the transaction if it was successfully verified.
@@ -629,6 +691,9 @@ func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error
mgr.mutex.Lock()
mgr.appendedLogIndex = nextLogIndex
+ if logEntry.CustomHooksUpdate != nil {
+ mgr.hookIndex = nextLogIndex
+ }
mgr.applyNotifications[nextLogIndex] = make(chan struct{})
mgr.mutex.Unlock()
@@ -637,10 +702,8 @@ func (mgr *TransactionManager) appendLogEntry(logEntry *gitalypb.LogEntry) error
// applyLogEntry reads a log entry at the given index and applies it to the repository.
func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error {
- var logEntry gitalypb.LogEntry
- key := keyLogEntry(getRepositoryID(mgr.repository), logIndex)
-
- if err := mgr.readKey(key, &logEntry); err != nil {
+ logEntry, err := mgr.readLogEntry(logIndex)
+ if err != nil {
return fmt.Errorf("read log entry: %w", err)
}
@@ -665,7 +728,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
return fmt.Errorf("set applied log index: %w", err)
}
- if err := mgr.deleteKey(key); err != nil {
+ if err := mgr.deleteLogEntry(logIndex); err != nil {
return fmt.Errorf("deleting log entry: %w", err)
}
@@ -744,6 +807,23 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo
return nil
}
+// deleteLogEntry deletes the log entry at the given index from the log.
+func (mgr *TransactionManager) deleteLogEntry(index LogIndex) error {
+ return mgr.deleteKey(keyLogEntry(getRepositoryID(mgr.repository), index))
+}
+
+// readLogEntry returns the log entry from the given position in the log.
+func (mgr *TransactionManager) readLogEntry(index LogIndex) (*gitalypb.LogEntry, error) {
+ var logEntry gitalypb.LogEntry
+ key := keyLogEntry(getRepositoryID(mgr.repository), index)
+
+ if err := mgr.readKey(key, &logEntry); err != nil {
+ return nil, fmt.Errorf("read key: %w", err)
+ }
+
+ return &logEntry, nil
+}
+
// storeLogEntry stores the log entry in the repository's write-ahead log at the given index.
func (mgr *TransactionManager) storeLogEntry(index LogIndex, entry *gitalypb.LogEntry) error {
return mgr.setKey(keyLogEntry(getRepositoryID(mgr.repository), index), entry)
diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go
index e1ec64c7e..8b6a97fb7 100644
--- a/internal/gitaly/transaction_manager_test.go
+++ b/internal/gitaly/transaction_manager_test.go
@@ -911,6 +911,7 @@ func TestTransactionManager(t *testing.T) {
TransactionID: 2,
ExpectedSnapshot: Snapshot{
ReadIndex: 1,
+ HookIndex: 1,
},
},
Commit{
@@ -977,6 +978,75 @@ func TestTransactionManager(t *testing.T) {
},
},
{
+ desc: "hook index is correctly determined from log and disk",
+ steps: steps{
+ StartManager{
+ Hooks: testHooks{
+ BeforeApplyLogEntry: func(hookContext) {
+ panic(errSimulatedCrash)
+ },
+ },
+ ExpectedError: errSimulatedCrash,
+ },
+ Begin{
+ TransactionID: 1,
+ },
+ Commit{
+ TransactionID: 1,
+ CustomHooksUpdate: &CustomHooksUpdate{
+ CustomHooksTAR: validCustomHooks(t),
+ },
+ },
+ AssertManager{
+ ExpectedError: errSimulatedCrash,
+ },
+ StartManager{},
+ Begin{
+ TransactionID: 2,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 1,
+ HookIndex: 1,
+ },
+ },
+ Commit{
+ TransactionID: 2,
+ CustomHooksUpdate: &CustomHooksUpdate{},
+ },
+ Begin{
+ TransactionID: 3,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 2,
+ HookIndex: 2,
+ },
+ },
+ StopManager{},
+ StartManager{},
+ Begin{
+ TransactionID: 4,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 2,
+ HookIndex: 2,
+ },
+ },
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(2).toProto(),
+ },
+ Hooks: testhelper.DirectoryState{
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks/1/pre-receive": {
+ Mode: umask.Mask(fs.ModePerm),
+ Content: []byte("hook content"),
+ },
+ "/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
+ "/wal/hooks/2": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ },
+ },
+ },
+ {
desc: "continues processing after reference verification failure",
steps: steps{
StartManager{},
@@ -1591,11 +1661,15 @@ func TestTransactionManager(t *testing.T) {
ReferenceUpdates: ReferenceUpdates{
"refs/heads/main": {OldOID: objectHash.ZeroOID, NewOID: rootCommitOID},
},
+ CustomHooksUpdate: &CustomHooksUpdate{
+ CustomHooksTAR: validCustomHooks(t),
+ },
},
Begin{
TransactionID: 3,
ExpectedSnapshot: Snapshot{
ReadIndex: 1,
+ HookIndex: 1,
},
},
Commit{
@@ -1608,6 +1682,7 @@ func TestTransactionManager(t *testing.T) {
TransactionID: 4,
ExpectedSnapshot: Snapshot{
ReadIndex: 2,
+ HookIndex: 1,
},
},
Rollback{
@@ -1617,6 +1692,7 @@ func TestTransactionManager(t *testing.T) {
TransactionID: 5,
ExpectedSnapshot: Snapshot{
ReadIndex: 2,
+ HookIndex: 1,
},
},
Commit{
@@ -1624,6 +1700,14 @@ func TestTransactionManager(t *testing.T) {
ReferenceUpdates: ReferenceUpdates{
"refs/heads/main": {OldOID: secondCommitOID, NewOID: thirdCommitOID},
},
+ CustomHooksUpdate: &CustomHooksUpdate{},
+ },
+ Begin{
+ TransactionID: 6,
+ ExpectedSnapshot: Snapshot{
+ ReadIndex: 3,
+ HookIndex: 3,
+ },
},
},
expectedState: StateAssertion{
@@ -1634,6 +1718,17 @@ func TestTransactionManager(t *testing.T) {
Database: DatabaseState{
string(keyAppliedLogIndex(getRepositoryID(repo))): LogIndex(3).toProto(),
},
+ Hooks: testhelper.DirectoryState{
+ "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ "/wal/hooks/1/pre-receive": {
+ Mode: umask.Mask(fs.ModePerm),
+ Content: []byte("hook content"),
+ },
+ "/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)},
+ "/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")},
+ "/wal/hooks/3": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)},
+ },
},
},
}