diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-03-15 11:04:24 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-03-15 11:04:24 +0300 |
commit | 824fa90ab9d0df5601fa9067bcb43eb1002e9151 (patch) | |
tree | d132721bddf0bea865e1fbc14b12d2bcb10218e4 | |
parent | 7ac86407d32afd258cb4eb63be0454ded71f404f (diff) |
major simplification of transaction managerpo_tx_man_simple
-rw-r--r-- | internal/praefect/transaction/manager.go | 23 | ||||
-rw-r--r-- | internal/praefect/transaction/mock_test.go | 79 | ||||
-rw-r--r-- | internal/praefect/transaction/shard.go | 37 | ||||
-rw-r--r-- | internal/praefect/transaction/transaction.go | 134 | ||||
-rw-r--r-- | internal/praefect/transaction/transaction_test.go | 140 |
5 files changed, 218 insertions, 195 deletions
diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go index 12a92d610..7fe38442e 100644 --- a/internal/praefect/transaction/manager.go +++ b/internal/praefect/transaction/manager.go @@ -32,7 +32,6 @@ type Manager struct { mu sync.Mutex shards map[string]*Shard // shards keyed by project - verifier Verifier coordinator Coordinator replman ReplicationManager } @@ -40,10 +39,9 @@ type Manager struct { // NewManafer returns a manager for coordinating transaction between shards // that are fetched from the coordinator. Any inconsistencies found will be // reported to the provided replication manager. -func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { +func NewManager(c Coordinator, r ReplicationManager) *Manager { return &Manager{ shards: map[string]*Shard{}, - verifier: v, coordinator: c, replman: r, } @@ -54,12 +52,18 @@ func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { // author to mark all relevant assets being modified during a mutating // transaction to ensure they are locked and protected from other closures // modifying the same. -func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(Tx) error) error { shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } + // Prevent other clients from modifying the same shard until finished. + // TODO: serialize access at the reference scope: + // https://gitlab.com/gitlab-org/gitaly/issues/1530 + shard.lock.Lock() + defer shard.lock.Unlock() + omits := make(map[string]struct{}) // TODO: some smart caching needs to be done to eliminate this check. We @@ -81,8 +85,7 @@ func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) return err } - tx := newTransaction(shard, good) - defer tx.unlockAll() + tx := newTx(shard, good, txCatMutator) err = fn(tx) if err != nil { @@ -119,12 +122,15 @@ func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degra return reported, eg.Wait() } -func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(Tx) error) error { shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } + shard.lock.RLock() + defer shard.lock.RUnlock() + // TODO: some smart caching needs to be done to eliminate this check. We // already check the shard consistency at the end of each transaction so // we shouldn't need to do it twice except for the first time we fetch a @@ -139,8 +145,7 @@ func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) return err } - tx := newTransaction(shard, good) - defer tx.unlockAll() + tx := newTx(shard, good, txCatAccessor) err = fn(tx) if err != nil { diff --git a/internal/praefect/transaction/mock_test.go b/internal/praefect/transaction/mock_test.go new file mode 100644 index 000000000..2aab6fcd3 --- /dev/null +++ b/internal/praefect/transaction/mock_test.go @@ -0,0 +1,79 @@ +package transaction_test + +import ( + "context" + "fmt" + "sync" + "testing" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" +) + +type mockCoordinator struct { + shards map[transaction.Repository]*transaction.Shard +} + +func (mc mockCoordinator) FetchShard(_ context.Context, repo transaction.Repository) (*transaction.Shard, error) { + s, ok := mc.shards[repo] + if !ok { + return nil, fmt.Errorf("shard doesn't exist for repo %+v", repo) + } + + return s, nil +} + +type mockReplMan struct{} + +func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil } + +type mockNode struct { + sync.RWMutex + + tb testing.TB + rpc *transaction.RPC + + // set the following values in test cases + storage string + checksums map[transaction.Repository][]string +} + +func (mn *mockNode) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) { + mn.Lock() + defer mn.Unlock() + + checksums, ok := mn.checksums[repo] + if !ok { + panic( + fmt.Sprintf( + "test setup problem: missing checksums in mock node %s for repo %+v", + mn.storage, repo, + ), + ) + } + + if len(checksums) < 1 { + panic( + fmt.Sprintf( + "test setup problem: not enough checksums for mock node %s for repo %+v", + mn.storage, repo, + ), + ) + } + + cs := checksums[0] + mn.checksums[repo] = checksums[1:len(checksums)] + + mn.tb.Logf("mock node %s returning checksum %s for repo %s", mn.storage, cs, repo) + + return []byte(cs), nil +} + +func (mn *mockNode) ForwardRPC(_ context.Context, rpc *transaction.RPC) error { + mn.Lock() + mn.rpc = rpc + mn.Unlock() + + return nil +} + +func (mn *mockNode) Storage() string { return mn.storage } diff --git a/internal/praefect/transaction/shard.go b/internal/praefect/transaction/shard.go index af6ec2c29..a583c61a7 100644 --- a/internal/praefect/transaction/shard.go +++ b/internal/praefect/transaction/shard.go @@ -8,38 +8,27 @@ import ( "golang.org/x/sync/errgroup" ) -// Shard represents a set of Gitaly replicas for repository. Each shard has a +// Shard represents a set of Gitaly replicas for a repository. Each shard has a // designated primary and maintains locks to resources that can cause contention // between clients writing to the same repo. type Shard struct { - repo Repository - - primary string // the designated primary node name - + lock *sync.RWMutex // all mutations require a write lock + repo Repository // the repo this replica backs + primary string // the storage location of the primary node storageReplicas map[string]Node // maps storage location to a replica +} - refLocks struct { - *sync.RWMutex - m map[string]*sync.RWMutex // maps ref name to a lock +func NewShard(r Repository, primary string, replicas []Node) *Shard { + sreps := make(map[string]Node) + for _, r := range replicas { + sreps[r.Storage()] = r } - // used to check shard for inconsistencies - verifier Verifier -} - -func NewShard(r Repository, primary string, replicas []Node, v Verifier) *Shard { return &Shard{ repo: r, primary: primary, - storageReplicas: make(map[string]Node), - refLocks: struct { - *sync.RWMutex - m map[string]*sync.RWMutex - }{ - RWMutex: new(sync.RWMutex), - m: make(map[string]*sync.RWMutex), - }, - verifier: v, + storageReplicas: sreps, + lock: new(sync.RWMutex), } } @@ -60,7 +49,7 @@ func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, b eg, eCtx := errgroup.WithContext(ctx) - for storage, _ := range s.storageReplicas { + for storage, node := range s.storageReplicas { _, ok := omits[storage] if ok { continue @@ -69,7 +58,7 @@ func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, b storage := storage // rescope iterator vars eg.Go(func() error { - cs, err := s.verifier.CheckSum(eCtx, s.repo) + cs, err := node.CheckSum(eCtx, s.repo) if err != nil { return err } diff --git a/internal/praefect/transaction/transaction.go b/internal/praefect/transaction/transaction.go index 2ddff2180..36bf0c00a 100644 --- a/internal/praefect/transaction/transaction.go +++ b/internal/praefect/transaction/transaction.go @@ -2,6 +2,7 @@ package transaction import ( "context" + "errors" "sync" ) @@ -25,28 +26,19 @@ type RPC struct { type Node interface { Storage() string // storage location the node hosts ForwardRPC(ctx context.Context, rpc *RPC) error - CheckSum(context.Context, Repository) ([]byte, error) - WriteRef(ctx context.Context, ref, value string) error } -// MutateTx represents the resources available during a mutator transaction -type MutateTx interface { - AccessTx - // LockRef acquires a write lock on a reference - LockRef(context.Context, string) error - Primary() Node -} +type txCategory int -// AccessTx represents the resources available during an accessor transaction -type AccessTx interface { - // Replicas returns all replicas without distinguishing the primary - Replicas() map[string]Node - // RLockRef acquires a read lock on a reference - RLockRef(context.Context, string) error -} +const ( + txCatAccessor = iota + txCatMutator +) + +type Tx struct { + category txCategory -type transaction struct { shard *Shard replicas map[string]Node // only nodes verified to be consistent @@ -61,13 +53,14 @@ type transaction struct { refRollbacks map[string][]byte } -func newTransaction(shard *Shard, good []Node) transaction { +func newTx(shard *Shard, good []Node, txCat txCategory) Tx { replicas := map[string]Node{} for _, node := range good { replicas[node.Storage()] = node } - return transaction{ + return Tx{ + category: txCat, shard: shard, replicas: replicas, unlocks: struct { @@ -80,110 +73,23 @@ func newTransaction(shard *Shard, good []Node) transaction { } } -// LockRef attempts to acquire a write lock on the reference name provided. -// If the context is cancelled first, the lock acquisition will be -// aborted. -func (t transaction) LockRef(ctx context.Context, ref string) error { - lockQ := make(chan *sync.RWMutex) - - go func() { - t.shard.refLocks.RLock() - l, ok := t.shard.refLocks.m[ref] - t.shard.refLocks.RUnlock() - - if !ok { - l = new(sync.RWMutex) - t.shard.refLocks.Lock() - t.shard.refLocks.m[ref] = l - t.shard.refLocks.Unlock() - } - - l.Lock() - lockQ <- l - }() - - // ensure lockQ is consumed in all code paths so that goroutine doesn't - // stray - select { - - case <-ctx.Done(): - l := <-lockQ - l.Unlock() - return ctx.Err() - - case l := <-lockQ: - t.unlocks.Lock() - t.unlocks.m[ref] = func() { l.Unlock() } - t.unlocks.Unlock() - - return nil - } -} - -// unlockAll will unlock all acquired locks at the end of a transaction -func (t transaction) unlockAll() { - // unlock all refs - t.unlocks.RLock() - for _, unlock := range t.unlocks.m { - unlock() - } - t.unlocks.RUnlock() -} - -func (t transaction) rollback(ctx context.Context) error { +func (t Tx) rollback(ctx context.Context) error { // for ref, value := range t.refRollbacks { // // } return nil } -func (t transaction) Replicas() map[string]Node { +func (t Tx) Replicas() map[string]Node { return t.replicas } -func (t transaction) Primary() Node { - return t.replicas[t.shard.primary] -} - -// RLockRef attempts to acquire a read lock on the reference name provided -// (ref). If the context is cancelled first, the lock acquisition will be -// aborted. -func (t transaction) RLockRef(ctx context.Context, ref string) error { - lockQ := make(chan *sync.RWMutex) - - go func() { - t.shard.refLocks.RLock() - l, ok := t.shard.refLocks.m[ref] - t.shard.refLocks.RUnlock() - - if !ok { - l = new(sync.RWMutex) - t.shard.refLocks.Lock() - t.shard.refLocks.m[ref] = l - t.shard.refLocks.Unlock() - } - - l.RLock() - lockQ <- l - }() - - // ensure lockQ is consumed in all code paths so that goroutine doesn't - // stray - select { - - case <-ctx.Done(): - // unlock before aborting - l := <-lockQ - l.RUnlock() - - return ctx.Err() - - case l := <-lockQ: - t.unlocks.Lock() - t.unlocks.m[ref] = func() { l.RUnlock() } - t.unlocks.Unlock() - - return nil +var ErrAccessorNotPermitted = errors.New("a mutator operation was attempted by an accessor") +func (t Tx) Primary() (Node, error) { + if t.category != txCatMutator { + return nil, ErrAccessorNotPermitted } + + return t.replicas[t.shard.primary], nil } diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go index 34666f807..ccbbae99d 100644 --- a/internal/praefect/transaction/transaction_test.go +++ b/internal/praefect/transaction/transaction_test.go @@ -3,69 +3,113 @@ package transaction_test import ( "context" "testing" + "time" "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" + + "github.com/stretchr/testify/require" ) -func TestReplMan(t *testing.T) { - const ( - projA = "project-A" - stor1 = "storage-1" - stor2 = "storage-2" - stor3 = "storage-3" - ) +const ( + projA = "project-A" + stor1 = "storage-1" + stor2 = "storage-2" + stor3 = "storage-3" +) - mv := &mockVerifier{ - checksums: map[string]map[string][][]byte{ - projA: map[string][][]byte{ - stor1: {[]byte{1}}, - stor2: {[]byte{1}}, - stor3: {[]byte{1}}, - }, - }, +type testCase struct { + name string + shards map[transaction.Repository]*transaction.Shard + txList []struct { + mutator bool + repo transaction.Repository + txFn func(transaction.Tx) error } - - // A transaction manager needs to have the ability to verify the state of - // replicas, so it needs a Verifier. - rm := transaction.NewManager(mv, mockCoordinator{}, mockReplMan{}) - rm.Access(context.Background(), transaction.Repository{}, func(transaction.AccessTx) error { - return nil - }) + expectErr error } -type mockCoordinator struct{} +var ( + repo1 = transaction.Repository{ + ProjectHash: projA, + StorageLoc: stor1, + } -func (_ mockCoordinator) FetchShard(context.Context, transaction.Repository) (*transaction.Shard, error) { - return nil, nil -} + testCases = []func(testing.TB) testCase{ + func(t testing.TB) testCase { + var ( + node1 = &mockNode{ + tb: t, + storage: stor1, + checksums: map[transaction.Repository][]string{ + repo1: []string{"1"}, + }, + } + ) -type mockReplMan struct{} + return testCase{ + name: "one node shard: no-op access tx", + shards: map[transaction.Repository]*transaction.Shard{ + repo1: transaction.NewShard( + repo1, + node1.storage, + []transaction.Node{node1}, + ), + }, + txList: []struct { + mutator bool + repo transaction.Repository + txFn func(transaction.Tx) error + }{ + { + repo: repo1, + txFn: func(_ transaction.Tx) error { + t.Log("this is a no-op transaction") + // checksum should get consumed during consistency + // check + require.Len(t, node1.checksums[repo1], 0) -func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil } + return nil + }, + }, + }, + } + }, + } +) -type mockVerifier struct { - // checksums contains ordered checksums keyed by project and then storage - checksums map[string]map[string][][]byte -} +func TestManager(t *testing.T) { + for _, ttFn := range testCases { + tt := ttFn(t) + t.Run(tt.name, func(t *testing.T) { + var ( + mc = mockCoordinator{tt.shards} + mrm = mockReplMan{} + rm = transaction.NewManager(mc, mrm) + ) -func (mv *mockVerifier) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) { - storages, ok := mv.checksums[repo.ProjectHash] - if !ok { - panic("no project " + repo.ProjectHash) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() - sums, ok := storages[repo.StorageLoc] - if !ok { - panic("no storage " + repo.StorageLoc) - } + var err error + for _, tx := range tt.txList { - if len(sums) < 1 { - panic("no more checksums for " + repo.ProjectHash) - } + if tx.mutator { + err = rm.Mutate(ctx, tx.repo, tx.txFn) + } else { + err = rm.Access(ctx, tx.repo, tx.txFn) + } - // pop first checksum off list - var sum []byte - sum, mv.checksums[repo.ProjectHash][repo.StorageLoc] = sums[len(sums)-1], sums[:len(sums)-1] + if err != nil { + break + } + } - return sum, nil + if tt.expectErr != nil { + require.Error(t, err) + require.EqualError(t, err, tt.expectErr.Error()) + } else { + require.NoError(t, err) + } + }) + } } |