Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2024-01-17 12:54:52 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2024-01-24 06:28:59 +0300
commit2e86cc5cffbfca565602b689ef8efa894fdf8fa6 (patch)
tree7b912fd34b186728c39fb99e043bc22e8b63616f
parent8c7ef2ac66421fbdb70025a3ab1ed237e8affa7a (diff)
Optimize transaction committed log retentionqmnguyen0711/optimize-transaction-committed-log-retention
We introduced a mechanism to keep track of committed log entries after it's admitted by the manager. Those logs are used for conflict checks, verification, result merges, etc. They are organized as an in-memory linked list. Empty leading log entries are truncated if there isn't further transaction using them as its snapshot repositories. That linked list stores the content of the log entry at the moment. Although each entry does not contain heavy data, they might be kept for a significant amount of time. For example, a repacking task might run for hours until being applied. We need to retain all entries from the time the task starts until then. A busy repository can make the list accumulated. Thus, memory is not the optimal place for it. Our database already contains all the log records. We can offload the entry content from memory and re-use saved log entry content in the database. At this point, a log entry is removed right after it's applied. We need to let the manager remove a log entry only if it's lower than the low-water mark, which is the front of the committed entry list. If an entry is not deleted after applied, it will be deleted when the referring transaction finishes eventually.
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go144
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager_test.go40
2 files changed, 129 insertions, 55 deletions
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index bd7653a04..c72024d0a 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -49,6 +49,9 @@ var (
ErrTransactionAlreadyRollbacked = errors.New("transaction already rollbacked")
// errInitializationFailed is returned when the TransactionManager failed to initialize successfully.
errInitializationFailed = errors.New("initializing transaction processing failed")
+ // errCommittedEntryGone is returned when the log entry of a LSN is gone from database while it's still
+ // accessed by other transactions.
+ errCommittedEntryGone = errors.New("in-used committed entry is gone")
// errNotDirectory is returned when the repository's path doesn't point to a directory
errNotDirectory = errors.New("repository's path didn't point to a directory")
// errRelativePathNotSet is returned when a transaction is begun without providing a relative path
@@ -317,7 +320,9 @@ func (mgr *TransactionManager) Begin(ctx context.Context, relativePath string, s
if !txn.readOnly {
mgr.mutex.Lock()
defer mgr.mutex.Unlock()
- mgr.cleanCommittedEntry(entry)
+ if err := mgr.cleanCommittedEntry(entry); err != nil {
+ return fmt.Errorf("cleaning committed entry: %w", err)
+ }
}
return nil
@@ -666,8 +671,6 @@ type snapshotLock struct {
type committedEntry struct {
// lsn is the associated LSN of the entry
lsn LSN
- // entry is the pointer to the corresponding log entry.
- entry *gitalypb.LogEntry
// snapshotReaders accounts for the number of transaction readers of the snapshot.
snapshotReaders int
}
@@ -1807,18 +1810,16 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti
defer mgr.mutex.Unlock()
// Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN.
- elm := mgr.committedEntries.Front()
- for elm != nil {
- entry := elm.Value.(*committedEntry)
- if entry.lsn > transaction.snapshotLSN && entry.entry.RelativePath == transaction.relativePath {
- if entry.entry.GetHousekeeping() != nil {
- return nil, errHousekeepingConflictConcurrent
- }
- if entry.entry.GetRepositoryDeletion() != nil {
- return nil, errConflictRepositoryDeletion
- }
+ if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry) error {
+ if entry.GetHousekeeping() != nil {
+ return errHousekeepingConflictConcurrent
}
- elm = elm.Next()
+ if entry.GetRepositoryDeletion() != nil {
+ return errConflictRepositoryDeletion
+ }
+ return nil
+ }); err != nil {
+ return nil, err
}
packRefsEntry, err := mgr.verifyPackRefs(mgr.ctx, transaction)
@@ -1860,23 +1861,21 @@ func (mgr *TransactionManager) verifyPackRefs(ctx context.Context, transaction *
packRefs := transaction.runHousekeeping.packRefs
// Check for any concurrent ref deletion between this transaction's snapshot LSN to the end.
- elm := mgr.committedEntries.Front()
- for elm != nil {
- entry := elm.Value.(*committedEntry)
- if entry.lsn > transaction.snapshotLSN && entry.entry.RelativePath == transaction.relativePath {
- for _, refTransaction := range entry.entry.ReferenceTransactions {
- for _, change := range refTransaction.Changes {
- if objectHash.IsZeroOID(git.ObjectID(change.GetNewOid())) {
- // Oops, there is a reference deletion. Bail out.
- return nil, errPackRefsConflictRefDeletion
- }
- // Ref update. Remove the updated ref from the list of pruned refs so that the
- // new OID in loose reference shadows the outdated OID in packed-refs.
- delete(packRefs.PrunedRefs, git.ReferenceName(change.GetReferenceName()))
+ if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry) error {
+ for _, refTransaction := range entry.ReferenceTransactions {
+ for _, change := range refTransaction.Changes {
+ if objectHash.IsZeroOID(git.ObjectID(change.GetNewOid())) {
+ // Oops, there is a reference deletion. Bail out.
+ return errPackRefsConflictRefDeletion
}
+ // Ref update. Remove the updated ref from the list of pruned refs so that the
+ // new OID in loose reference shadows the outdated OID in packed-refs.
+ delete(packRefs.PrunedRefs, git.ReferenceName(change.GetReferenceName()))
}
}
- elm = elm.Next()
+ return nil
+ }); err != nil {
+ return nil, err
}
var prunedRefs [][]byte
@@ -1989,8 +1988,7 @@ func (mgr *TransactionManager) appendLogEntry(nextLSN LSN, logEntry *gitalypb.Lo
mgr.appendedLSN = nextLSN
mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})}
mgr.committedEntries.PushBack(&committedEntry{
- lsn: nextLSN,
- entry: logEntry,
+ lsn: nextLSN,
})
mgr.mutex.Unlock()
@@ -2056,12 +2054,23 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn LSN) error
return fmt.Errorf("set applied LSN: %w", err)
}
- if err := mgr.deleteLogEntry(lsn); err != nil {
- return fmt.Errorf("deleting log entry: %w", err)
- }
-
mgr.appliedLSN = lsn
+ // When this log entry is applied, if there is any log in front of it which are still referred, we cannot delete
+ // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the
+ // low-water mark might scan all afterward log entries.
+ //
+ // ┌─ Can be removed ─┐ ┌─ Cannot be removed
+ // □ □ □ □ □ □ □ □ □ □ ■ ■ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ⧅ ⧅ ⧅ ⧅ ■
+ // └─ Low-water mark, still referred by another transaction
+ //
+ // Eventually, log entry at the low-water mark are removed when the last referring transaction finishes.
+ if lsn < mgr.lowWaterMark() {
+ if err := mgr.deleteLogEntry(lsn); err != nil {
+ return fmt.Errorf("deleting log entry: %w", err)
+ }
+ }
+
// There is no awaiter for a transaction if the transaction manager is recovering
// transactions from the log after starting up.
if resultChan, ok := mgr.awaitingTransactions[lsn]; ok {
@@ -2535,6 +2544,19 @@ func (mgr *TransactionManager) deleteKey(key []byte) error {
})
}
+// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than
+// this mark are removed.
+func (mgr *TransactionManager) lowWaterMark() LSN {
+ mgr.mutex.Lock()
+ defer mgr.mutex.Unlock()
+
+ elm := mgr.committedEntries.Front()
+ if elm == nil {
+ return mgr.appliedLSN + 1
+ }
+ return elm.Value.(*committedEntry).lsn
+}
+
// updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on.
func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committedEntry, error) {
// Since the goroutine doing this is holding the lock, the snapshotLSN shouldn't change and no new transactions
@@ -2549,11 +2571,6 @@ func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committed
entry := &committedEntry{
lsn: snapshotLSN,
snapshotReaders: 1,
- // The log entry is left nil. This doesn't matter as the conflict checking only
- // needs it when checking for conflicts with transactions committed after we took
- // our snapshot.
- //
- // This `committedEntry` only exists to record the `snapshotReaders` at this LSN.
}
mgr.committedEntries.PushBack(entry)
@@ -2561,9 +2578,34 @@ func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committed
return entry, nil
}
+// walkCommittedEntries walks all committed entries after input transaction's snapshot LSN. It loads the content of the
+// entry from disk and triggers the callback with entry content.
+func (mgr *TransactionManager) walkCommittedEntries(transaction *Transaction, callback func(*gitalypb.LogEntry) error) error {
+ elm := mgr.committedEntries.Front()
+ for elm != nil {
+ committed := elm.Value.(*committedEntry)
+ if committed.lsn > transaction.snapshotLSN {
+ entry, err := mgr.readLogEntry(committed.lsn)
+ if err != nil {
+ return errCommittedEntryGone
+ }
+ // Transaction manager works on the partition level, including a repository and all of its pool
+ // member repositories (if any). We need to filter log entries of the repository this
+ // transaction targets.
+ if entry.RelativePath == transaction.relativePath {
+ if err := callback(entry); err != nil {
+ return err
+ }
+ }
+ }
+ elm = elm.Next()
+ }
+ return nil
+}
+
// cleanCommittedEntry reduces the snapshot readers counter of the committed entry. It also removes entries with no more
// readers at the head of the list.
-func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) {
+func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) error {
entry.snapshotReaders--
elm := mgr.committedEntries.Front()
@@ -2573,11 +2615,31 @@ func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) {
// If the first entry had still some snapshot readers, that means
// our transaction was not the oldest reader. We can't remove any entries
// as they'll still be needed for conlict checking the older transactions.
- return
+ return nil
}
+
mgr.committedEntries.Remove(elm)
+ // When a transaction keeping references to the frontmost entry finishes, it's possible the transaction
+ // could be committed or canceled. If the transaction is committed, the manager blocks it until the
+ // referred log entry is applied. If the transaction is canceled, the referred log entry might not be
+ // applied. We could not clean them up until the manager applies them.
+ if front.lsn <= mgr.appliedLSN {
+ select {
+ // If the manager is closing or completely closed, the DB is properly in a non-ready state. It's
+ // likely a result of a rare race condition. There isn't anything we can do about it now. In the
+ // future, we'll need add a simple GC task to the manager to get rid of log entries <=
+ // appliedLSN at startup.
+ case <-mgr.closed:
+ case <-mgr.closing:
+ default:
+ if err := mgr.deleteLogEntry(front.lsn); err != nil {
+ return fmt.Errorf("removing log entry when not referred anymore: %w", err)
+ }
+ }
+ }
elm = mgr.committedEntries.Front()
}
+ return nil
}
// keyAppliedLSN returns the database key storing a partition's last applied log entry's LSN.
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
index 2e5e2b8b8..9fda100f9 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
@@ -1543,8 +1543,14 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio
}
}
+type expectedCommittedEntry struct {
+ lsn LSN
+ snapshotReaders int
+ entry *gitalypb.LogEntry
+}
+
func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []transactionTestCase {
- assertCommittedEntries := func(t *testing.T, expected []*committedEntry, actualList *list.List) {
+ assertCommittedEntries := func(t *testing.T, manager *TransactionManager, expected []*expectedCommittedEntry, actualList *list.List) {
require.Equal(t, len(expected), actualList.Len())
i := 0
@@ -1552,7 +1558,13 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
actual := elm.Value.(*committedEntry)
require.Equal(t, expected[i].lsn, actual.lsn)
require.Equal(t, expected[i].snapshotReaders, actual.snapshotReaders)
- testhelper.ProtoEqual(t, expected[i].entry, actual.entry)
+
+ if expected[i].entry != nil {
+ actualEntry, err := manager.readLogEntry(actual.lsn)
+ require.NoError(t, err)
+
+ testhelper.ProtoEqual(t, expected[i].entry, actualEntry)
+ }
i++
}
}
@@ -1579,7 +1591,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
steps: steps{
StartManager{},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries)
}),
},
},
@@ -1592,7 +1604,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
RelativePath: setup.RelativePath,
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{
{
lsn: 0,
snapshotReaders: 1,
@@ -1606,7 +1618,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
},
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries)
}),
Begin{
TransactionID: 2,
@@ -1614,7 +1626,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
ExpectedSnapshotLSN: 1,
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{
{
lsn: 1,
snapshotReaders: 1,
@@ -1628,7 +1640,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
},
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries)
}),
},
expectedState: StateAssertion{
@@ -1673,7 +1685,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
ExpectedSnapshotLSN: 1,
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{
{
lsn: 1,
snapshotReaders: 2,
@@ -1687,7 +1699,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
},
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{
{
lsn: 1,
snapshotReaders: 1,
@@ -1704,7 +1716,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
ExpectedSnapshotLSN: 2,
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{
{
lsn: 1,
snapshotReaders: 1,
@@ -1723,7 +1735,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
},
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{
{
lsn: 2,
entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID),
@@ -1739,7 +1751,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
TransactionID: 4,
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries)
}),
},
expectedState: StateAssertion{
@@ -1773,7 +1785,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
TransactionID: 1,
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries)
}),
Begin{
TransactionID: 2,
@@ -1784,7 +1796,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t
TransactionID: 2,
},
AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
- assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries)
}),
},
expectedState: StateAssertion{