diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-03-13 21:03:31 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-03-13 21:03:34 +0300 |
commit | d9ff755e526298a03042d236c578b359802af459 (patch) | |
tree | 62bb06c46733bf313ec635388809ffdfbde68826 | |
parent | 3561b41098f34d364ccc544e4d54e9f8d1b953aa (diff) |
move transaction to dedicated package
Transaction will be easy to isolate and work as a WIP
if we keep it in a dedicated package. Also, this
means using interfaces heavily to avoid relying on
future features (coordinator, replication). Coordinator
is now an interface to abstract the requirements of
transactions.
-rw-r--r-- | internal/praefect/transaction/transaction.go (renamed from internal/praefect/transaction.go) | 39 | ||||
-rw-r--r-- | internal/praefect/transaction/transaction_test.go | 64 |
2 files changed, 83 insertions, 20 deletions
diff --git a/internal/praefect/transaction.go b/internal/praefect/transaction/transaction.go index 27ba57ba6..9bc49bc24 100644 --- a/internal/praefect/transaction.go +++ b/internal/praefect/transaction/transaction.go @@ -1,5 +1,5 @@ -/*Package praefect provides transaction management functionality to coordinate -one-to-many clients attempting to modify the shards concurrently. +/*Package transaction provides transaction management functionality to +coordinate one-to-many clients attempting to modify the shards concurrently. While Git is distributed in nature, there are some repository wide data points that can conflict between replicas if something goes wrong. This includes @@ -7,7 +7,7 @@ references, which is why the transaction manager provides an API that allows an RPC transaction to read/write lock the references being accessed to prevent contention. */ -package praefect +package transaction import ( "context" @@ -30,16 +30,22 @@ type Verifier interface { CheckSum(context.Context, Repository) ([]byte, error) } +// Coordinator allows the transaction manager to look up the shard for a repo +// at the beginning of each transaction +type Coordinator interface { + FetchShard(ctx context.Context, repo Repository) (*Shard, error) +} + // ReplicationManager provides services to handle degraded nodes type ReplicationManager interface { NotifyDegradation(context.Context, Repository, Node) error } -// TransactionManager tracks the progress of RPCs being applied to multiple +// Manager tracks the progress of RPCs being applied to multiple // downstream servers that make up a shard. It prevents conflicts that may arise // from contention between multiple clients trying to modify the same // references. -type TransactionManager struct { +type Manager struct { mu sync.Mutex shards map[string]*Shard // shards keyed by project @@ -48,8 +54,8 @@ type TransactionManager struct { replman ReplicationManager } -func NewTransactionManager(v Verifier, c Coordinator, r ReplicationManager) *TransactionManager { - return &TransactionManager{ +func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { + return &Manager{ shards: map[string]*Shard{}, verifier: v, coordinator: c, @@ -238,8 +244,8 @@ func (t transaction) RLockRef(ctx context.Context, ref string) error { // author to mark all relevant assets being modified during a mutating // transaction to ensure they are locked and protected from other closures // modifying the same. -func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { - shard, err := tm.coordinator.FetchShard(ctx, repo) +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } @@ -259,7 +265,7 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu node := node // rescope iterator var for goroutine closure eg.Go(func() error { - return tm.replman.NotifyDegradation(eCtx, repo, node) + return m.replman.NotifyDegradation(eCtx, repo, node) }) } @@ -269,7 +275,6 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu } defer tx.unlockAll() - // run the transaction function err = fn(tx) if err != nil { return err @@ -278,8 +283,8 @@ func (tm *TransactionManager) Mutate(ctx context.Context, repo Repository, fn fu return nil } -func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { - shard, err := tm.coordinator.FetchShard(ctx, repo) +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } @@ -299,7 +304,7 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu node := node // rescope iterator var for goroutine closure eg.Go(func() error { - return tm.replman.NotifyDegradation(eCtx, repo, node) + return m.replman.NotifyDegradation(eCtx, repo, node) }) } @@ -309,7 +314,6 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu } defer tx.unlockAll() - // run the transaction function err = fn(tx) if err != nil { return err @@ -317,8 +321,3 @@ func (tm *TransactionManager) Access(ctx context.Context, repo Repository, fn fu return nil } - -func (c *Coordinator) FetchShard(ctx context.Context, repo Repository) (*Shard, error) { - // TODO: move this to coordinator.go - return nil, nil -} diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go new file mode 100644 index 000000000..ca7064e41 --- /dev/null +++ b/internal/praefect/transaction/transaction_test.go @@ -0,0 +1,64 @@ +package transaction_test + +import ( + "context" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/praefect" +) + +func TestReplMan(t *testing.T) { + const ( + projA = "project-A" + stor1 = "storage-1" + stor2 = "storage-2" + stor3 = "storage-3" + ) + + mv := &mockVerifier{ + checksums: map[string]map[string][][]byte{ + projA: map[string][][]byte{ + stor1: {[]byte{1}}, + stor2: {[]byte{1}}, + stor3: {[]byte{1}}, + }, + }, + } + + // A replication manager needs to have the ability to verify the state of + // replicas, so it needs a Verifier. + rm := praefect.NewReplicationManager(mv) + + // replication managers are typically used within the context of a request + // when a mutator RPC is received. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + +} + +type mockVerifier struct { + // checksums contains ordered checksums keyed by project and then storage + checksums map[string]map[string][][]byte +} + +func (mv *mockVerifier) CheckSum(_ context.Context, project, storage string) ([]byte, error) { + storages, ok := mv.checksums[project] + if !ok { + panic("no project " + project) + } + + sums, ok := storages[storage] + if !ok { + panic("no storage " + storage) + } + + if len(sums) < 1 { + panic("no more checksums for " + project) + } + + // pop first checksum off list + var sum []byte + sum, mv.checksums[project][storage] = sums[len(sums)-1], sums[:len(sums)-1] + + return sum, nil +} |