Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-12-04 08:59:25 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-12-22 14:22:10 +0300
commit90622c7f1f80692acb4763c5590c8482b637b983 (patch)
tree511c7f439a05d382dc14407ec9c4b63210665b31
parent081014df5deb8522ff7fa452eeb898d79ef5397d (diff)
Add keeparound entries to Transaction Manager
Some types of transactions, such as housekeeping, operate on a snapshot repository. There is a gap between the transaction doing its work and the time when it is committed. They need to verify if concurrent operations can cause conflict. This commit lets the transaction manager maintain a list of keeparound log entries. These log entries are still kept around even after they are applied. They are removed when no active readers are accessing the corresponding snapshots.
-rw-r--r--internal/gitaly/storage/storagemgr/testhelper_test.go5
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager.go87
-rw-r--r--internal/gitaly/storage/storagemgr/transaction_manager_test.go254
3 files changed, 345 insertions, 1 deletions
diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go
index 2f8cb9234..0618b1146 100644
--- a/internal/gitaly/storage/storagemgr/testhelper_test.go
+++ b/internal/gitaly/storage/storagemgr/testhelper_test.go
@@ -392,6 +392,9 @@ type StateAssertion struct {
Repositories RepositoryStates
}
+// AdhocAssertion allows a test to add some custom assertions apart from the built-in assertions above.
+type AdhocAssertion func(*testing.T, context.Context, *TransactionManager)
+
// steps defines execution steps in a test. Each test case can define multiple steps to exercise
// more complex behavior.
type steps []any
@@ -694,6 +697,8 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas
}),
)
}, step.Repositories)
+ case AdhocAssertion:
+ step(t, ctx, transactionManager)
default:
t.Fatalf("unhandled step type: %T", step)
}
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go
index 584cd56fa..bc42e05d6 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager.go
@@ -2,6 +2,7 @@ package storagemgr
import (
"bytes"
+ "container/list"
"context"
"encoding/binary"
"errors"
@@ -259,6 +260,16 @@ func (mgr *TransactionManager) Begin(ctx context.Context, relativePath string, s
mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Add(1)
defer mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Done()
readReady := mgr.snapshotLocks[txn.snapshotLSN].applied
+
+ var entry *committedEntry
+ if !txn.readOnly {
+ var err error
+ entry, err = mgr.updateCommittedEntry(txn.snapshotLSN)
+ if err != nil {
+ return nil, err
+ }
+ }
+
mgr.mutex.Unlock()
txn.finish = func() error {
@@ -270,6 +281,12 @@ func (mgr *TransactionManager) Begin(ctx context.Context, relativePath string, s
}
}
+ if !txn.readOnly {
+ mgr.mutex.Lock()
+ defer mgr.mutex.Unlock()
+ mgr.cleanCommittedEntry(entry)
+ }
+
return nil
}
@@ -599,6 +616,17 @@ type snapshotLock struct {
activeSnapshotters sync.WaitGroup
}
+// committedEntry is a wrapper for a log entry. It is used to keep track of entries in which their snapshots are still
+// accessed by other transactions.
+type committedEntry struct {
+ // lsn is the associated LSN of the entry
+ lsn LSN
+ // entry is the pointer to the corresponding log entry.
+ entry *gitalypb.LogEntry
+ // snapshotReaders accounts for the number of transaction readers of the snapshot.
+ snapshotReaders int
+}
+
// TransactionManager is responsible for transaction management of a single repository. Each repository has
// a single TransactionManager; it is the repository's single-writer. It accepts writes one at a time from
// the admissionQueue. Each admitted write is processed in three steps:
@@ -683,7 +711,8 @@ type TransactionManager struct {
// Run and Begin which are ran in different goroutines.
mutex sync.Mutex
- // snapshotLocks contains state used for synchronizing snapshotters with the log application.
+ // snapshotLocks contains state used for synchronizing snapshotters with the log application. The
+ // lock is released after the corresponding log entry is applied.
snapshotLocks map[LSN]*snapshotLock
// appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log.
@@ -697,6 +726,12 @@ type TransactionManager struct {
// the partition. It's keyed by the LSN the transaction is waiting to be applied and the
// value is the resultChannel that is waiting the result.
awaitingTransactions map[LSN]resultChannel
+ // committedEntries keeps some latest appended log entries around. Some types of transactions, such as
+ // housekeeping, operate on snapshot repository. There is a gap between transaction doing its work and the time
+ // when it is committed. They need to verify if concurrent operations can cause conflict. These log entries are
+ // still kept around even after they are applied. They are removed when there are no active readers accessing
+ // the corresponding snapshots.
+ committedEntries *list.List
}
// NewTransactionManager returns a new TransactionManager for the given repository.
@@ -730,6 +765,7 @@ func NewTransactionManager(
stagingDirectory: stagingDir,
housekeepingManager: housekeepingManager,
awaitingTransactions: make(map[LSN]resultChannel),
+ committedEntries: list.New(),
}
}
@@ -1713,6 +1749,10 @@ func (mgr *TransactionManager) appendLogEntry(nextLSN LSN, logEntry *gitalypb.Lo
mgr.mutex.Lock()
mgr.appendedLSN = nextLSN
mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})}
+ mgr.committedEntries.PushBack(&committedEntry{
+ lsn: nextLSN,
+ entry: logEntry,
+ })
mgr.mutex.Unlock()
return nil
@@ -2154,6 +2194,51 @@ func (mgr *TransactionManager) deleteKey(key []byte) error {
})
}
+// updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on.
+func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committedEntry, error) {
+ // Since the goroutine doing this is holding the lock, the snapshotLSN shouldn't change and no new transactions
+ // can be committed or added. That should guarantee .Back() is always the latest transaction and the one we're
+ // using to base our snapshot on.
+ if elm := mgr.committedEntries.Back(); elm != nil {
+ entry := elm.Value.(*committedEntry)
+ entry.snapshotReaders++
+ return entry, nil
+ }
+
+ entry := &committedEntry{
+ lsn: snapshotLSN,
+ snapshotReaders: 1,
+ // The log entry is left nil. This doesn't matter as the conflict checking only
+ // needs it when checking for conflicts with transactions committed after we took
+ // our snapshot.
+ //
+ // This `committedEntry` only exists to record the `snapshotReaders` at this LSN.
+ }
+
+ mgr.committedEntries.PushBack(entry)
+
+ return entry, nil
+}
+
+// cleanCommittedEntry reduces the snapshot readers counter of the committed entry. It also removes entries with no more
+// readers at the head of the list.
+func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) {
+ entry.snapshotReaders--
+
+ elm := mgr.committedEntries.Front()
+ for elm != nil {
+ front := mgr.committedEntries.Front().Value.(*committedEntry)
+ if front.snapshotReaders > 0 {
+ // If the first entry had still some snapshot readers, that means
+ // our transaction was not the oldest reader. We can't remove any entries
+ // as they'll still be needed for conlict checking the older transactions.
+ return
+ }
+ mgr.committedEntries.Remove(elm)
+ elm = mgr.committedEntries.Front()
+ }
+}
+
// keyAppliedLSN returns the database key storing a partition's last applied log entry's LSN.
func keyAppliedLSN(ptnID partitionID) []byte {
return []byte(fmt.Sprintf("partition/%s/applied_lsn", ptnID.MarshalBinary()))
diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
index 2a2b4e41e..eed2ff37b 100644
--- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go
@@ -3,6 +3,7 @@ package storagemgr
import (
"archive/tar"
"bytes"
+ "container/list"
"context"
"encoding/hex"
"errors"
@@ -250,6 +251,7 @@ func TestTransactionManager(t *testing.T) {
var testCases []transactionTestCase
subTests := [][]transactionTestCase{
generateCommonTests(t, ctx, setup),
+ generateCommittedEntriesTests(t, setup),
generateInvalidReferencesTests(t, setup),
generateModifyReferencesTests(t, setup),
generateCreateRepositoryTests(t, setup),
@@ -1508,6 +1510,258 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio
}
}
+func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []transactionTestCase {
+ assertCommittedEntries := func(t *testing.T, expected []*committedEntry, actualList *list.List) {
+ require.Equal(t, len(expected), actualList.Len())
+
+ i := 0
+ for elm := actualList.Front(); elm != nil; elm = elm.Next() {
+ actual := elm.Value.(*committedEntry)
+ require.Equal(t, expected[i].lsn, actual.lsn)
+ require.Equal(t, expected[i].snapshotReaders, actual.snapshotReaders)
+ testhelper.ProtoEqual(t, expected[i].entry, actual.entry)
+ i++
+ }
+ }
+
+ refChangeLogEntry := func(ref string, oid git.ObjectID) *gitalypb.LogEntry {
+ return &gitalypb.LogEntry{
+ RelativePath: setup.RelativePath,
+ ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{
+ {
+ Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{
+ {
+ ReferenceName: []byte(ref),
+ NewOid: []byte(oid),
+ },
+ },
+ },
+ },
+ }
+ }
+
+ return []transactionTestCase{
+ {
+ desc: "manager has just initialized",
+ steps: steps{
+ StartManager{},
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ }),
+ },
+ },
+ {
+ desc: "a transaction has one reader",
+ steps: steps{
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ RelativePath: setup.RelativePath,
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{
+ {
+ lsn: 0,
+ snapshotReaders: 1,
+ },
+ }, tm.committedEntries)
+ }),
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ }),
+ Begin{
+ TransactionID: 2,
+ RelativePath: setup.RelativePath,
+ ExpectedSnapshotLSN: 1,
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{
+ {
+ lsn: 1,
+ snapshotReaders: 1,
+ },
+ }, tm.committedEntries)
+ }),
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ }),
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLSN(setup.PartitionID)): LSN(2).toProto(),
+ },
+ Repositories: RepositoryStates{
+ setup.RelativePath: {
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/branch-1", Target: string(setup.Commits.First.OID)},
+ {Name: "refs/heads/main", Target: string(setup.Commits.First.OID)},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "a transaction has multiple readers",
+ steps: steps{
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ RelativePath: setup.RelativePath,
+ },
+ Commit{
+ TransactionID: 1,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ Begin{
+ TransactionID: 2,
+ RelativePath: setup.RelativePath,
+ ExpectedSnapshotLSN: 1,
+ },
+ Begin{
+ TransactionID: 3,
+ RelativePath: setup.RelativePath,
+ ExpectedSnapshotLSN: 1,
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{
+ {
+ lsn: 1,
+ snapshotReaders: 2,
+ },
+ }, tm.committedEntries)
+ }),
+ Commit{
+ TransactionID: 2,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{
+ {
+ lsn: 1,
+ snapshotReaders: 1,
+ },
+ {
+ lsn: 2,
+ entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID),
+ },
+ }, tm.committedEntries)
+ }),
+ Begin{
+ TransactionID: 4,
+ RelativePath: setup.RelativePath,
+ ExpectedSnapshotLSN: 2,
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{
+ {
+ lsn: 1,
+ snapshotReaders: 1,
+ },
+ {
+ lsn: 2,
+ snapshotReaders: 1,
+ entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID),
+ },
+ }, tm.committedEntries)
+ }),
+ Commit{
+ TransactionID: 3,
+ ReferenceUpdates: ReferenceUpdates{
+ "refs/heads/branch-2": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID},
+ },
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{
+ {
+ lsn: 2,
+ entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID),
+ snapshotReaders: 1,
+ },
+ {
+ lsn: 3,
+ entry: refChangeLogEntry("refs/heads/branch-2", setup.Commits.First.OID),
+ },
+ }, tm.committedEntries)
+ }),
+ Rollback{
+ TransactionID: 4,
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ }),
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{
+ string(keyAppliedLSN(setup.PartitionID)): LSN(3).toProto(),
+ },
+ Repositories: RepositoryStates{
+ setup.RelativePath: {
+ DefaultBranch: "refs/heads/main",
+ References: []git.Reference{
+ {Name: "refs/heads/branch-1", Target: string(setup.Commits.First.OID)},
+ {Name: "refs/heads/branch-2", Target: string(setup.Commits.First.OID)},
+ {Name: "refs/heads/main", Target: string(setup.Commits.First.OID)},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "committed read-only transaction are not kept",
+ steps: steps{
+ StartManager{},
+ Begin{
+ TransactionID: 1,
+ RelativePath: setup.RelativePath,
+ ReadOnly: true,
+ },
+ Commit{
+ TransactionID: 1,
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ }),
+ Begin{
+ TransactionID: 2,
+ RelativePath: setup.RelativePath,
+ ReadOnly: true,
+ },
+ Commit{
+ TransactionID: 2,
+ },
+ AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) {
+ assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries)
+ }),
+ },
+ expectedState: StateAssertion{
+ Database: DatabaseState{},
+ Repositories: RepositoryStates{
+ setup.RelativePath: {
+ DefaultBranch: "refs/heads/main",
+ },
+ },
+ },
+ },
+ }
+}
+
// BenchmarkTransactionManager benchmarks the transaction throughput of the TransactionManager at various levels
// of concurrency and transaction sizes.
func BenchmarkTransactionManager(b *testing.B) {