Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-10-11 20:45:05 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-10-12 17:54:03 +0300
commitc73aac16b2e66ff299f5a2674ec361a7c850fdc6 (patch)
tree5dc0e650d7a5d933857e65f96ab98f4dd79e0b14
parentb08390d7f8b23ef979d0f1bacbd9d24cc5253494 (diff)
Standardize on referring to log positions with LSN
LSN or log sequence number is the usual term used to refer to a given position in a log. Our code is currently using log index in most places. This commit standardizes the name to LSN by renaming each use of log index to LSN. This way we'll have easier time also writing documentation later that refers to LSNs.
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go198
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go22
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager_test.go168
-rw-r--r--proto/go/gitalypb/log.pb.go51
-rw-r--r--proto/log.proto16
5 files changed, 227 insertions, 228 deletions
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 9283fa580..e39d1a106 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -74,17 +74,17 @@ func (err ReferenceVerificationError) Error() string {
return fmt.Sprintf("expected %q to point to %q but it pointed to %q", err.ReferenceName, err.ExpectedOID, err.ActualOID)
}
-// LogIndex points to a specific position in a repository's write-ahead log.
-type LogIndex uint64
+// LSN is a log sequence number that points to a specific position in the partition's write-ahead log.
+type LSN uint64
-// toProto returns the protobuf representation of LogIndex for serialization purposes.
-func (index LogIndex) toProto() *gitalypb.LogIndex {
- return &gitalypb.LogIndex{LogIndex: uint64(index)}
+// toProto returns the protobuf representation of LSN for serialization purposes.
+func (lsn LSN) toProto() *gitalypb.LSN {
+ return &gitalypb.LSN{Value: uint64(lsn)}
}
-// String returns a string representation of the LogIndex.
-func (index LogIndex) String() string {
- return strconv.FormatUint(uint64(index), 10)
+// String returns a string representation of the LSN.
+func (lsn LSN) String() string {
+ return strconv.FormatUint(uint64(lsn), 10)
}
// ReferenceUpdate describes the state of a reference's old and new tip in an update.
@@ -170,7 +170,7 @@ type Transaction struct {
// snapshotLSN is the log sequence number which this transaction is reading the repository's
// state at.
- snapshotLSN LogIndex
+ snapshotLSN LSN
skipVerificationFailures bool
initialReferenceValues map[git.ReferenceName]git.ObjectID
@@ -195,7 +195,7 @@ type TransactionOptions struct {
// Begin call. Begin blocks until the committed writes have been applied to the repository.
func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOptions) (_ *Transaction, returnedErr error) {
// Wait until the manager has been initialized so the notification channels
- // and the log indexes are loaded.
+ // and the LSNs are loaded.
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -210,7 +210,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts TransactionOption
txn := &Transaction{
readOnly: opts.ReadOnly,
commit: mgr.commit,
- snapshotLSN: mgr.appendedLogIndex,
+ snapshotLSN: mgr.appendedLSN,
finished: make(chan struct{}),
relativePath: mgr.relativePath,
}
@@ -456,7 +456,7 @@ func (txn *Transaction) finishUnadmitted() error {
}
// SnapshotLSN returns the LSN of the Transaction's read snapshot.
-func (txn *Transaction) SnapshotLSN() LogIndex {
+func (txn *Transaction) SnapshotLSN() LSN {
return txn.snapshotLSN
}
@@ -615,15 +615,15 @@ type snapshotLock struct {
// the repository on start up.
//
// TransactionManager maintains the write-ahead log in a key-value store. It maintains the following key spaces:
-// - `repository/<repository_id:string>/log/index/applied`
-// - This key stores the index of the log entry that has been applied to the repository. This allows for
-// determining how far a repository is in processing the log and which log entries need to be applied
-// after starting up. Repository starts from log index 0 if there are no log entries recorded to have
+// - `partition/<partition_id>/applied_lsn`
+// - This key stores the LSN of the log entry that has been applied to the repository. This allows for
+// determining how far a partition is in processing the log and which log entries need to be applied
+// after starting up. Partition starts from LSN 0 if there are no log entries recorded to have
// been applied.
//
-// - `repository/<repository_id:string>/log/entry/<log_index:uint64>`
-// - These keys hold the actual write-ahead log entries. A repository's first log entry starts at index 1
-// and the log index keeps monotonically increasing from there on without gaps. The write-ahead log
+// - `partition/<partition_id:string>/log/entry/<log_index:uint64>`
+// - These keys hold the actual write-ahead log entries. A partition's first log entry starts at LSN 1
+// and the LSN keeps monotonically increasing from there on without gaps. The write-ahead log
// entries are processed in ascending order.
//
// The values in the database are marshaled protocol buffer messages. Numbers in the keys are encoded as big
@@ -674,7 +674,7 @@ type TransactionManager struct {
// initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't,
// transactions will fail to begin.
initializationSuccessful bool
- // mutex guards access to snapshotLocks and appendedLogIndex. These fields are accessed by both
+ // mutex guards access to snapshotLocks and appendedLSN. These fields are accessed by both
// Run and Begin which are ran in different goroutines.
mutex sync.Mutex
@@ -682,19 +682,19 @@ type TransactionManager struct {
// process targets the main repository and creates locks in it.
stateLock sync.RWMutex
// snapshotLocks contains state used for synchronizing snapshotters with the log application.
- snapshotLocks map[LogIndex]*snapshotLock
+ snapshotLocks map[LSN]*snapshotLock
- // appendedLogIndex holds the index of the last log entry appended to the log.
- appendedLogIndex LogIndex
- // appliedLogIndex holds the index of the last log entry applied to the repository
- appliedLogIndex LogIndex
+ // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log.
+ appendedLSN LSN
+ // appliedLSN holds the LSN of the last log entry applied to the partition.
+ appliedLSN LSN
// housekeepingManager access to the housekeeping.Manager.
housekeepingManager housekeeping.Manager
// awaitingTransactions contains transactions waiting for their log entry to be applied to
- // the repository. It's keyed by the log index the transaction is waiting to be applied and the
+ // the partition. It's keyed by the LSN the transaction is waiting to be applied and the
// value is the resultChannel that is waiting the result.
- awaitingTransactions map[LogIndex]resultChannel
+ awaitingTransactions map[LSN]resultChannel
}
// NewTransactionManager returns a new TransactionManager for the given repository.
@@ -725,11 +725,11 @@ func NewTransactionManager(
db: newDatabaseAdapter(db),
admissionQueue: make(chan *Transaction),
initialized: make(chan struct{}),
- snapshotLocks: make(map[LogIndex]*snapshotLock),
+ snapshotLocks: make(map[LSN]*snapshotLock),
stateDirectory: stateDir,
stagingDirectory: stagingDir,
housekeepingManager: housekeepingManager,
- awaitingTransactions: make(map[LogIndex]resultChannel),
+ awaitingTransactions: make(map[LSN]resultChannel),
}
}
@@ -977,10 +977,10 @@ func (mgr *TransactionManager) Run() (returnedErr error) {
}
for {
- if mgr.appliedLogIndex < mgr.appendedLogIndex {
- logIndex := mgr.appliedLogIndex + 1
+ if mgr.appliedLSN < mgr.appendedLSN {
+ lsn := mgr.appliedLSN + 1
- if err := mgr.applyLogEntry(mgr.ctx, logIndex); err != nil {
+ if err := mgr.applyLogEntry(mgr.ctx, lsn); err != nil {
return fmt.Errorf("apply log entry: %w", err)
}
@@ -1056,11 +1056,11 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
}
}
- nextLogIndex := mgr.appendedLogIndex + 1
+ nextLSN := mgr.appendedLSN + 1
if transaction.packPrefix != "" {
logEntry.PackPrefix = transaction.packPrefix
- removeFiles, err := mgr.storeWALFiles(mgr.ctx, nextLogIndex, transaction)
+ removeFiles, err := mgr.storeWALFiles(mgr.ctx, nextLSN, transaction)
cleanUps = append(cleanUps, func() error {
// The transaction's files might have been moved successfully in to the log.
// If anything fails before the transaction is committed, the files must be removed as otherwise
@@ -1083,13 +1083,13 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) {
logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{}
}
- return mgr.appendLogEntry(nextLogIndex, logEntry)
+ return mgr.appendLogEntry(nextLSN, logEntry)
}(); err != nil {
transaction.result <- err
return nil
}
- mgr.awaitingTransactions[mgr.appendedLogIndex] = transaction.result
+ mgr.awaitingTransactions[mgr.appendedLSN] = transaction.result
return nil
}
@@ -1108,23 +1108,23 @@ func (mgr *TransactionManager) isClosing() bool {
}
// initialize initializes the TransactionManager's state from the database. It loads the appendend and the applied
-// indexes and initializes the notification channels that synchronize transaction beginning with log entry applying.
+// LSNs and initializes the notification channels that synchronize transaction beginning with log entry applying.
func (mgr *TransactionManager) initialize(ctx context.Context) error {
defer close(mgr.initialized)
- var appliedLogIndex gitalypb.LogIndex
- if err := mgr.readKey(keyAppliedLogIndex(mgr.partitionID), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
- return fmt.Errorf("read applied log index: %w", err)
+ var appliedLSN gitalypb.LSN
+ if err := mgr.readKey(keyAppliedLSN(mgr.partitionID), &appliedLSN); err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
+ return fmt.Errorf("read applied LSN: %w", err)
}
- mgr.appliedLogIndex = LogIndex(appliedLogIndex.LogIndex)
+ mgr.appliedLSN = LSN(appliedLSN.Value)
- // The index of the last appended log entry is determined from the indexes of the latest entry in the log and
+ // The LSN of the last appended log entry is determined from the LSN of the latest entry in the log and
// the latest applied log entry. If there is a log entry, it is the latest appended log entry. If there are no
- // log entries, the latest log entry must have been applied to the repository and pruned away, meaning the index
- // of the last appended log entry is the same as the index if the last applied log entry.
+ // log entries, the latest log entry must have been applied to the repository and pruned away, meaning the LSN
+ // of the last appended log entry is the same as the LSN if the last applied log entry.
//
- // As the log indexes in the keys are encoded in big endian, the latest log entry can be found by taking
+ // As the LSNs in the keys are encoded in big endian, the latest log entry can be found by taking
// the first key when iterating the log entry key space in reverse.
if err := mgr.db.View(func(txn databaseTransaction) error {
logPrefix := keyPrefixLogEntries(mgr.partitionID)
@@ -1132,35 +1132,35 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error {
iterator := txn.NewIterator(badger.IteratorOptions{Reverse: true, Prefix: logPrefix})
defer iterator.Close()
- mgr.appendedLogIndex = mgr.appliedLogIndex
+ mgr.appendedLSN = mgr.appliedLSN
// The iterator seeks to a key that is greater than or equal than seeked key. Since we are doing a reverse
// seek, we need to add 0xff to the prefix so the first iterated key is the latest log entry.
if iterator.Seek(append(logPrefix, 0xff)); iterator.Valid() {
- mgr.appendedLogIndex = LogIndex(binary.BigEndian.Uint64(bytes.TrimPrefix(iterator.Item().Key(), logPrefix)))
+ mgr.appendedLSN = LSN(binary.BigEndian.Uint64(bytes.TrimPrefix(iterator.Item().Key(), logPrefix)))
}
return nil
}); err != nil {
- return fmt.Errorf("determine appended log index: %w", err)
+ return fmt.Errorf("determine appended LSN: %w", err)
}
if err := mgr.createStateDirectory(); err != nil {
return fmt.Errorf("create state directory: %w", err)
}
- // Create a snapshot lock for the applied index as it is used for synchronizing
+ // Create a snapshot lock for the applied LSN as it is used for synchronizing
// the snapshotters with the log application.
- mgr.snapshotLocks[mgr.appliedLogIndex] = &snapshotLock{applied: make(chan struct{})}
- close(mgr.snapshotLocks[mgr.appliedLogIndex].applied)
+ mgr.snapshotLocks[mgr.appliedLSN] = &snapshotLock{applied: make(chan struct{})}
+ close(mgr.snapshotLocks[mgr.appliedLSN].applied)
// Each unapplied log entry should have a snapshot lock as they are created in normal
// operation when committing a log entry. Recover these entries.
- for i := mgr.appliedLogIndex + 1; i <= mgr.appendedLogIndex; i++ {
+ for i := mgr.appliedLSN + 1; i <= mgr.appendedLSN; i++ {
mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})}
}
- if err := mgr.removeStaleWALFiles(mgr.ctx, mgr.appendedLogIndex); err != nil {
+ if err := mgr.removeStaleWALFiles(mgr.ctx, mgr.appendedLSN); err != nil {
return fmt.Errorf("remove stale packs: %w", err)
}
@@ -1240,10 +1240,10 @@ func (mgr *TransactionManager) removePackedRefsLocks(ctx context.Context, reposi
// removeStaleWALFiles removes files from the log directory that have no associated log entry.
// Such files can be left around if transaction's files were moved in place successfully
// but the manager was interrupted before successfully persisting the log entry itself.
-func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, appendedIndex LogIndex) error {
+func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, appendedLSN LSN) error {
// Log entries are appended one by one to the log. If a write is interrupted, the only possible stale
- // pack would be for the next log index. Remove the pack if it exists.
- possibleStaleFilesPath := walFilesPathForLogIndex(mgr.stateDirectory, appendedIndex+1)
+ // pack would be for the next LSN. Remove the pack if it exists.
+ possibleStaleFilesPath := walFilesPathForLSN(mgr.stateDirectory, appendedLSN+1)
if _, err := os.Stat(possibleStaleFilesPath); err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("remove: %w", err)
@@ -1267,10 +1267,10 @@ func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, appended
// storeWALFiles moves the transaction's logged files from the staging directory to their destination in the log.
// It returns a function, even on errors, that must be called to clean up the files if committing the log entry
// fails.
-func (mgr *TransactionManager) storeWALFiles(ctx context.Context, index LogIndex, transaction *Transaction) (func() error, error) {
+func (mgr *TransactionManager) storeWALFiles(ctx context.Context, lsn LSN, transaction *Transaction) (func() error, error) {
removeFiles := func() error { return nil }
- destinationPath := walFilesPathForLogIndex(mgr.stateDirectory, index)
+ destinationPath := walFilesPathForLSN(mgr.stateDirectory, lsn)
if err := os.Rename(
transaction.walFilesPath(),
destinationPath,
@@ -1294,9 +1294,9 @@ func (mgr *TransactionManager) storeWALFiles(ctx context.Context, index LogIndex
return removeFiles, nil
}
-// walFilesPathForLogIndex returns an absolute path to a given log entry's WAL files.
-func walFilesPathForLogIndex(stateDir string, index LogIndex) string {
- return filepath.Join(stateDir, "wal", index.String())
+// walFilesPathForLSN returns an absolute path to a given log entry's WAL files.
+func walFilesPathForLSN(stateDir string, lsn LSN) string {
+ return filepath.Join(stateDir, "wal", lsn.String())
}
// packFilePath returns a log entry's pack file's absolute path in the wal files directory.
@@ -1496,22 +1496,22 @@ func (mgr *TransactionManager) prepareReferenceTransaction(ctx context.Context,
// appendLogEntry appends the transaction to the write-ahead log. References that failed verification are skipped and thus not
// logged nor applied later.
-func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *gitalypb.LogEntry) error {
- if err := mgr.storeLogEntry(nextLogIndex, logEntry); err != nil {
+func (mgr *TransactionManager) appendLogEntry(nextLSN LSN, logEntry *gitalypb.LogEntry) error {
+ if err := mgr.storeLogEntry(nextLSN, logEntry); err != nil {
return fmt.Errorf("set log entry: %w", err)
}
mgr.mutex.Lock()
- mgr.appendedLogIndex = nextLogIndex
- mgr.snapshotLocks[nextLogIndex] = &snapshotLock{applied: make(chan struct{})}
+ mgr.appendedLSN = nextLSN
+ mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})}
mgr.mutex.Unlock()
return nil
}
-// applyLogEntry reads a log entry at the given index and applies it to the repository.
-func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error {
- logEntry, err := mgr.readLogEntry(logIndex)
+// applyLogEntry reads a log entry at the given LSN and applies it to the repository.
+func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn LSN) error {
+ logEntry, err := mgr.readLogEntry(lsn)
if err != nil {
return fmt.Errorf("read log entry: %w", err)
}
@@ -1519,10 +1519,10 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
// Ensure all snapshotters have finished snapshotting the previous state before we apply
// the new state to the repository. No new snapshotters can arrive at this point. All
// new transactions would be waiting for the committed log entry we are about to apply.
- previousIndex := logIndex - 1
- mgr.snapshotLocks[previousIndex].activeSnapshotters.Wait()
+ previousLSN := lsn - 1
+ mgr.snapshotLocks[previousLSN].activeSnapshotters.Wait()
mgr.mutex.Lock()
- delete(mgr.snapshotLocks, previousIndex)
+ delete(mgr.snapshotLocks, previousLSN)
mgr.mutex.Unlock()
if logEntry.RepositoryDeletion != nil {
@@ -1534,7 +1534,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
}
} else {
if logEntry.PackPrefix != "" {
- if err := mgr.applyPackFile(ctx, logIndex, logEntry); err != nil {
+ if err := mgr.applyPackFile(ctx, lsn, logEntry); err != nil {
return fmt.Errorf("apply pack file: %w", err)
}
}
@@ -1552,26 +1552,26 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn
}
}
- if err := mgr.storeAppliedLogIndex(logIndex); err != nil {
- return fmt.Errorf("set applied log index: %w", err)
+ if err := mgr.storeAppliedLSN(lsn); err != nil {
+ return fmt.Errorf("set applied LSN: %w", err)
}
- if err := mgr.deleteLogEntry(logIndex); err != nil {
+ if err := mgr.deleteLogEntry(lsn); err != nil {
return fmt.Errorf("deleting log entry: %w", err)
}
- mgr.appliedLogIndex = logIndex
+ mgr.appliedLSN = lsn
// There is no awaiter for a transaction if the transaction manager is recovering
// transactions from the log after starting up.
- if resultChan, ok := mgr.awaitingTransactions[logIndex]; ok {
+ if resultChan, ok := mgr.awaitingTransactions[lsn]; ok {
resultChan <- nil
- delete(mgr.awaitingTransactions, logIndex)
+ delete(mgr.awaitingTransactions, lsn)
}
// Notify the transactions waiting for this log entry to be applied prior to take their
// snapshot.
- close(mgr.snapshotLocks[logIndex].applied)
+ close(mgr.snapshotLocks[lsn].applied)
return nil
}
@@ -1614,7 +1614,7 @@ func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, logE
// applyPackFile unpacks the objects from the pack file into the repository if the log entry
// has an associated pack file. This is done by hard linking the pack and index from the
// log into the repository's object directory.
-func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex, logEntry *gitalypb.LogEntry) error {
+func (mgr *TransactionManager) applyPackFile(ctx context.Context, lsn LSN, logEntry *gitalypb.LogEntry) error {
packDirectory := filepath.Join(mgr.getAbsolutePath(logEntry.RelativePath), "objects", "pack")
for _, fileExtension := range []string{
".pack",
@@ -1622,7 +1622,7 @@ func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIn
".rev",
} {
if err := os.Link(
- filepath.Join(walFilesPathForLogIndex(mgr.stateDirectory, logIndex), "objects"+fileExtension),
+ filepath.Join(walFilesPathForLSN(mgr.stateDirectory, lsn), "objects"+fileExtension),
filepath.Join(packDirectory, logEntry.PackPrefix+fileExtension),
); err != nil {
if !errors.Is(err, fs.ErrExist) {
@@ -1677,15 +1677,15 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logEntry *g
return nil
}
-// deleteLogEntry deletes the log entry at the given index from the log.
-func (mgr *TransactionManager) deleteLogEntry(index LogIndex) error {
- return mgr.deleteKey(keyLogEntry(mgr.partitionID, index))
+// deleteLogEntry deletes the log entry at the given LSN from the log.
+func (mgr *TransactionManager) deleteLogEntry(lsn LSN) error {
+ return mgr.deleteKey(keyLogEntry(mgr.partitionID, lsn))
}
// readLogEntry returns the log entry from the given position in the log.
-func (mgr *TransactionManager) readLogEntry(index LogIndex) (*gitalypb.LogEntry, error) {
+func (mgr *TransactionManager) readLogEntry(lsn LSN) (*gitalypb.LogEntry, error) {
var logEntry gitalypb.LogEntry
- key := keyLogEntry(mgr.partitionID, index)
+ key := keyLogEntry(mgr.partitionID, lsn)
if err := mgr.readKey(key, &logEntry); err != nil {
return nil, fmt.Errorf("read key: %w", err)
@@ -1694,14 +1694,14 @@ func (mgr *TransactionManager) readLogEntry(index LogIndex) (*gitalypb.LogEntry,
return &logEntry, nil
}
-// storeLogEntry stores the log entry in the partition's write-ahead log at the given index.
-func (mgr *TransactionManager) storeLogEntry(index LogIndex, entry *gitalypb.LogEntry) error {
- return mgr.setKey(keyLogEntry(mgr.partitionID, index), entry)
+// storeLogEntry stores the log entry in the partition's write-ahead log at the given LSN.
+func (mgr *TransactionManager) storeLogEntry(lsn LSN, entry *gitalypb.LogEntry) error {
+ return mgr.setKey(keyLogEntry(mgr.partitionID, lsn), entry)
}
-// storeAppliedLogIndex stores the partition's applied log index in the database.
-func (mgr *TransactionManager) storeAppliedLogIndex(index LogIndex) error {
- return mgr.setKey(keyAppliedLogIndex(mgr.partitionID), index.toProto())
+// storeAppliedLSN stores the partition's applied LSN in the database.
+func (mgr *TransactionManager) storeAppliedLSN(lsn LSN) error {
+ return mgr.setKey(keyAppliedLSN(mgr.partitionID), lsn.toProto())
}
// setKey marshals and stores a given protocol buffer message into the database under the given key.
@@ -1745,15 +1745,15 @@ func (mgr *TransactionManager) deleteKey(key []byte) error {
})
}
-// keyAppliedLogIndex returns the database key storing a partition's last applied log entry's index.
-func keyAppliedLogIndex(ptnID partitionID) []byte {
- return []byte(fmt.Sprintf("partition/%s/log/index/applied", ptnID.MarshalBinary()))
+// keyAppliedLSN returns the database key storing a partition's last applied log entry's LSN.
+func keyAppliedLSN(ptnID partitionID) []byte {
+ return []byte(fmt.Sprintf("partition/%s/applied_lsn", ptnID.MarshalBinary()))
}
-// keyLogEntry returns the database key storing a partition's log entry at a given index.
-func keyLogEntry(ptnID partitionID, index LogIndex) []byte {
- marshaledIndex := make([]byte, binary.Size(index))
- binary.BigEndian.PutUint64(marshaledIndex, uint64(index))
+// keyLogEntry returns the database key storing a partition's log entry at a given LSN.
+func keyLogEntry(ptnID partitionID, lsn LSN) []byte {
+ marshaledIndex := make([]byte, binary.Size(lsn))
+ binary.BigEndian.PutUint64(marshaledIndex, uint64(lsn))
return []byte(fmt.Sprintf("%s%s", keyPrefixLogEntries(ptnID), marshaledIndex))
}
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go
index 788cdee2c..1c043d6a2 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go
@@ -35,10 +35,10 @@ type hooks struct {
beforeDeferredClose hookFunc
// beforeDeleteLogEntry is invoked before a log entry is deleted from the database.
beforeDeleteLogEntry hookFunc
- // beforeReadAppliedLogIndex is invoked before the applied log index is read from the database.
- beforeReadAppliedLogIndex hookFunc
- // beforeStoreAppliedLogIndex is invoked before a the applied log index is stored.
- beforeStoreAppliedLogIndex hookFunc
+ // beforeReadAppliedLSN is invoked before the applied LSN is read from the database.
+ beforeReadAppliedLSN hookFunc
+ // beforeStoreAppliedLSN is invoked before a the applied LSN is stored.
+ beforeStoreAppliedLSN hookFunc
}
// installHooks installs the configured hooks into the transactionManager.
@@ -109,8 +109,8 @@ type databaseTransactionHook struct {
}
var (
- regexLogEntry = regexp.MustCompile("partition/.+/log/entry/")
- regexLogIndex = regexp.MustCompile("partition/.+/log/index/applied")
+ regexLogEntry = regexp.MustCompile("partition/.+/log/entry/")
+ regexAppliedLSN = regexp.MustCompile("partition/.+/applied_lsn")
)
func (hook databaseTransactionHook) Get(key []byte) (*badger.Item, error) {
@@ -118,9 +118,9 @@ func (hook databaseTransactionHook) Get(key []byte) (*badger.Item, error) {
if hook.hooks.beforeReadLogEntry != nil {
hook.hooks.beforeReadLogEntry(hook.hookContext)
}
- } else if regexLogIndex.Match(key) {
- if hook.hooks.beforeReadAppliedLogIndex != nil {
- hook.hooks.beforeReadAppliedLogIndex(hook.hookContext)
+ } else if regexAppliedLSN.Match(key) {
+ if hook.hooks.beforeReadAppliedLSN != nil {
+ hook.hooks.beforeReadAppliedLSN(hook.hookContext)
}
}
@@ -146,8 +146,8 @@ type writeBatchHook struct {
}
func (hook writeBatchHook) Set(key []byte, value []byte) error {
- if regexLogIndex.Match(key) && hook.hooks.beforeStoreAppliedLogIndex != nil {
- hook.hooks.beforeStoreAppliedLogIndex(hook.hookContext)
+ if regexAppliedLSN.Match(key) && hook.hooks.beforeStoreAppliedLSN != nil {
+ hook.hooks.beforeStoreAppliedLSN(hook.hookContext)
}
if regexLogEntry.Match(key) && hook.hooks.beforeStoreLogEntry != nil {
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
index 0add472af..f6751d729 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
@@ -272,10 +272,10 @@ func TestTransactionManager(t *testing.T) {
BeforeAppendLogEntry hookFunc
// BeforeDeleteLogEntry is called before a log entry is deleted.
BeforeDeleteLogEntry hookFunc
- // beforeReadAppliedLogIndex is invoked before a the applied log index is read.
- BeforeReadAppliedLogIndex hookFunc
- // beforeStoreAppliedLogIndex is invoked before a the applied log index is stored.
- BeforeStoreAppliedLogIndex hookFunc
+ // BeforeReadAppliedLSN is invoked before the applied LSN is read.
+ BeforeReadAppliedLSN hookFunc
+ // BeforeStoreAppliedLSN is invoked before the applied LSN is stored.
+ BeforeStoreAppliedLSN hookFunc
// WaitForTransactionsWhenClosing waits for a in-flight to finish before returning
// from Run.
WaitForTransactionsWhenClosing bool
@@ -316,7 +316,7 @@ func TestTransactionManager(t *testing.T) {
// Context is the context to use for the Begin call.
Context context.Context
// ExpectedSnapshot is the expected LSN this transaction should read the repsoitory's state at.
- ExpectedSnapshotLSN LogIndex
+ ExpectedSnapshotLSN LSN
// ExpectedError is the error expected to be returned from the Begin call.
ExpectedError error
}
@@ -454,7 +454,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -477,7 +477,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -505,7 +505,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -556,7 +556,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
},
},
@@ -601,7 +601,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
},
},
@@ -635,7 +635,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -710,7 +710,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -768,7 +768,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -821,7 +821,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -866,7 +866,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -907,7 +907,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -943,7 +943,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -982,7 +982,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1028,7 +1028,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1084,7 +1084,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1120,7 +1120,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
},
},
@@ -1144,7 +1144,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
},
},
@@ -1181,7 +1181,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1225,7 +1225,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1268,7 +1268,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1311,7 +1311,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1354,7 +1354,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
},
},
@@ -1377,7 +1377,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1426,7 +1426,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
},
},
@@ -1450,7 +1450,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1495,7 +1495,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1540,7 +1540,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1590,7 +1590,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1624,7 +1624,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1655,7 +1655,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1688,7 +1688,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -1726,7 +1726,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
StartManager{
Hooks: testHooks{
- BeforeStoreAppliedLogIndex: func(hookContext) {
+ BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
},
@@ -1749,7 +1749,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -1819,7 +1819,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -1864,7 +1864,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1903,7 +1903,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1945,7 +1945,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -1960,7 +1960,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
StartManager{
Hooks: testHooks{
- BeforeStoreAppliedLogIndex: func(hookCtx hookContext) {
+ BeforeStoreAppliedLSN: func(hookCtx hookContext) {
panic(errSimulatedCrash)
},
},
@@ -1993,7 +1993,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2038,7 +2038,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2088,7 +2088,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2150,7 +2150,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2239,7 +2239,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2282,7 +2282,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2329,7 +2329,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2382,7 +2382,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2424,7 +2424,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2442,7 +2442,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
StartManager{
Hooks: testHooks{
- BeforeStoreAppliedLogIndex: func(hookCtx hookContext) {
+ BeforeStoreAppliedLSN: func(hookCtx hookContext) {
panic(errSimulatedCrash)
},
},
@@ -2479,7 +2479,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Repositories: RepositoryStates{
relativePath: {
@@ -2627,7 +2627,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(3).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(3).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -2677,7 +2677,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -2775,7 +2775,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(3).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(3).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -2824,7 +2824,7 @@ func TestTransactionManager(t *testing.T) {
Prune{},
StartManager{
Hooks: testHooks{
- BeforeStoreAppliedLogIndex: func(hookContext) {
+ BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
},
@@ -2848,7 +2848,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -2933,7 +2933,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -2979,7 +2979,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -3009,7 +3009,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -3054,7 +3054,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -3137,7 +3137,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -3212,7 +3212,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -3245,7 +3245,7 @@ func TestTransactionManager(t *testing.T) {
//
// The Manager starts up and we expect the pack file to be gone at the end of the test.
ModifyRepository: func(_ testing.TB, _ config.Cfg, repoPath string) {
- packFilePath := packFilePath(walFilesPathForLogIndex(repoPath, 1))
+ packFilePath := packFilePath(walFilesPathForLSN(repoPath, 1))
require.NoError(t, os.MkdirAll(filepath.Dir(packFilePath), perm.PrivateDir))
require.NoError(t, os.WriteFile(
packFilePath,
@@ -3281,7 +3281,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3308,7 +3308,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3335,7 +3335,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3364,7 +3364,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3393,7 +3393,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3435,7 +3435,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3445,7 +3445,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
StartManager{
Hooks: testHooks{
- BeforeStoreAppliedLogIndex: func(hookContext) {
+ BeforeStoreAppliedLSN: func(hookContext) {
panic(errSimulatedCrash)
},
},
@@ -3477,7 +3477,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3533,7 +3533,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -3579,7 +3579,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Repositories: RepositoryStates{},
},
@@ -3589,7 +3589,7 @@ func TestTransactionManager(t *testing.T) {
steps: steps{
StartManager{
Hooks: testHooks{
- BeforeReadAppliedLogIndex: func(hookContext) {
+ BeforeReadAppliedLSN: func(hookContext) {
// Raise a panic when the manager is about to read the applied log
// index when initializing. In reality this would crash the server but
// in tests it serves as a way to abort the initialization in correct
@@ -3634,7 +3634,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
},
},
@@ -3650,7 +3650,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
},
},
@@ -3829,7 +3829,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(1).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(1).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -3942,7 +3942,7 @@ func TestTransactionManager(t *testing.T) {
},
expectedState: StateAssertion{
Database: DatabaseState{
- string(keyAppliedLogIndex(partitionID)): LogIndex(2).toProto(),
+ string(keyAppliedLSN(partitionID)): LSN(2).toProto(),
},
Directory: testhelper.DirectoryState{
"/": {Mode: fs.ModeDir | perm.PrivateDir},
@@ -4129,9 +4129,9 @@ func TestTransactionManager(t *testing.T) {
inflightTransactions.Wait()
}
},
- beforeDeleteLogEntry: step.Hooks.BeforeDeleteLogEntry,
- beforeReadAppliedLogIndex: step.Hooks.BeforeReadAppliedLogIndex,
- beforeStoreAppliedLogIndex: step.Hooks.BeforeStoreAppliedLogIndex,
+ beforeDeleteLogEntry: step.Hooks.BeforeDeleteLogEntry,
+ beforeReadAppliedLSN: step.Hooks.BeforeReadAppliedLSN,
+ beforeStoreAppliedLSN: step.Hooks.BeforeStoreAppliedLSN,
})
go func() {
diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go
index eb49c09f9..ad27bf6a0 100644
--- a/proto/go/gitalypb/log.pb.go
+++ b/proto/go/gitalypb/log.pb.go
@@ -20,10 +20,10 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// LogEntry is a single entry in a repository's write-ahead log.
+// LogEntry is a single entry in a partition's write-ahead log.
//
// Schema for :
-// - `repository/<repository_id>/log/entry/<log_index>`.
+// - `partition/<partition_id>/log/entry/<log_index>`.
type LogEntry struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -122,22 +122,22 @@ func (x *LogEntry) GetRepositoryDeletion() *LogEntry_RepositoryDeletion {
return nil
}
-// LogIndex serializes a log index. It's used for storing a repository's
-// applied log index in the database.
+// LSN serializes a log sequence number. It's used for storing a partition's
+// applied LSN in the database.
//
// Schema for:
-// - `repository/<repository_id>/log/index/applied`
-type LogIndex struct {
+// - `partition/<partition_id>/applied_lsn`
+type LSN struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- // log_index is an index pointing to a position in the log.
- LogIndex uint64 `protobuf:"varint,1,opt,name=log_index,json=logIndex,proto3" json:"log_index,omitempty"`
+ // value is an LSN pointing to a position in the log.
+ Value uint64 `protobuf:"varint,1,opt,name=value,proto3" json:"value,omitempty"`
}
-func (x *LogIndex) Reset() {
- *x = LogIndex{}
+func (x *LSN) Reset() {
+ *x = LSN{}
if protoimpl.UnsafeEnabled {
mi := &file_log_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -145,13 +145,13 @@ func (x *LogIndex) Reset() {
}
}
-func (x *LogIndex) String() string {
+func (x *LSN) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*LogIndex) ProtoMessage() {}
+func (*LSN) ProtoMessage() {}
-func (x *LogIndex) ProtoReflect() protoreflect.Message {
+func (x *LSN) ProtoReflect() protoreflect.Message {
mi := &file_log_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -163,14 +163,14 @@ func (x *LogIndex) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
-// Deprecated: Use LogIndex.ProtoReflect.Descriptor instead.
-func (*LogIndex) Descriptor() ([]byte, []int) {
+// Deprecated: Use LSN.ProtoReflect.Descriptor instead.
+func (*LSN) Descriptor() ([]byte, []int) {
return file_log_proto_rawDescGZIP(), []int{1}
}
-func (x *LogIndex) GetLogIndex() uint64 {
+func (x *LSN) GetValue() uint64 {
if x != nil {
- return x.LogIndex
+ return x.Value
}
return 0
}
@@ -421,13 +421,12 @@ var file_log_proto_rawDesc = []byte{
0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f,
0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, 0x72, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70,
0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x22,
- 0x27, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, 0x0a, 0x09, 0x6c,
- 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08,
- 0x6c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c,
- 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72,
- 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f,
- 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06,
- 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x1b, 0x0a, 0x03, 0x4c, 0x53, 0x4e, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x34, 0x5a, 0x32,
+ 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61,
+ 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36,
+ 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79,
+ 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -445,7 +444,7 @@ func file_log_proto_rawDescGZIP() []byte {
var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_log_proto_goTypes = []interface{}{
(*LogEntry)(nil), // 0: gitaly.LogEntry
- (*LogIndex)(nil), // 1: gitaly.LogIndex
+ (*LSN)(nil), // 1: gitaly.LSN
(*LogEntry_ReferenceUpdate)(nil), // 2: gitaly.LogEntry.ReferenceUpdate
(*LogEntry_DefaultBranchUpdate)(nil), // 3: gitaly.LogEntry.DefaultBranchUpdate
(*LogEntry_CustomHooksUpdate)(nil), // 4: gitaly.LogEntry.CustomHooksUpdate
@@ -482,7 +481,7 @@ func file_log_proto_init() {
}
}
file_log_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*LogIndex); i {
+ switch v := v.(*LSN); i {
case 0:
return &v.state
case 1:
diff --git a/proto/log.proto b/proto/log.proto
index 278713735..b5ae5a575 100644
--- a/proto/log.proto
+++ b/proto/log.proto
@@ -4,10 +4,10 @@ package gitaly;
option go_package = "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb";
-// LogEntry is a single entry in a repository's write-ahead log.
+// LogEntry is a single entry in a partition's write-ahead log.
//
// Schema for :
-// - `repository/<repository_id>/log/entry/<log_index>`.
+// - `partition/<partition_id>/log/entry/<log_index>`.
message LogEntry {
// ReferenceUpdate models a single reference update.
message ReferenceUpdate {
@@ -58,12 +58,12 @@ message LogEntry {
RepositoryDeletion repository_deletion = 6;
}
-// LogIndex serializes a log index. It's used for storing a repository's
-// applied log index in the database.
+// LSN serializes a log sequence number. It's used for storing a partition's
+// applied LSN in the database.
//
// Schema for:
-// - `repository/<repository_id>/log/index/applied`
-message LogIndex {
- // log_index is an index pointing to a position in the log.
- uint64 log_index = 1;
+// - `partition/<partition_id>/applied_lsn`
+message LSN {
+ // value is an LSN pointing to a position in the log.
+ uint64 value = 1;
}