Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-03-15 11:04:24 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-03-15 11:04:24 +0300
commit824fa90ab9d0df5601fa9067bcb43eb1002e9151 (patch)
treed132721bddf0bea865e1fbc14b12d2bcb10218e4
parent7ac86407d32afd258cb4eb63be0454ded71f404f (diff)
major simplification of transaction managerpo_tx_man_simple
-rw-r--r--internal/praefect/transaction/manager.go23
-rw-r--r--internal/praefect/transaction/mock_test.go79
-rw-r--r--internal/praefect/transaction/shard.go37
-rw-r--r--internal/praefect/transaction/transaction.go134
-rw-r--r--internal/praefect/transaction/transaction_test.go140
5 files changed, 218 insertions, 195 deletions
diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go
index 12a92d610..7fe38442e 100644
--- a/internal/praefect/transaction/manager.go
+++ b/internal/praefect/transaction/manager.go
@@ -32,7 +32,6 @@ type Manager struct {
mu sync.Mutex
shards map[string]*Shard // shards keyed by project
- verifier Verifier
coordinator Coordinator
replman ReplicationManager
}
@@ -40,10 +39,9 @@ type Manager struct {
// NewManafer returns a manager for coordinating transaction between shards
// that are fetched from the coordinator. Any inconsistencies found will be
// reported to the provided replication manager.
-func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager {
+func NewManager(c Coordinator, r ReplicationManager) *Manager {
return &Manager{
shards: map[string]*Shard{},
- verifier: v,
coordinator: c,
replman: r,
}
@@ -54,12 +52,18 @@ func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager {
// author to mark all relevant assets being modified during a mutating
// transaction to ensure they are locked and protected from other closures
// modifying the same.
-func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error {
+func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(Tx) error) error {
shard, err := m.coordinator.FetchShard(ctx, repo)
if err != nil {
return err
}
+ // Prevent other clients from modifying the same shard until finished.
+ // TODO: serialize access at the reference scope:
+ // https://gitlab.com/gitlab-org/gitaly/issues/1530
+ shard.lock.Lock()
+ defer shard.lock.Unlock()
+
omits := make(map[string]struct{})
// TODO: some smart caching needs to be done to eliminate this check. We
@@ -81,8 +85,7 @@ func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx)
return err
}
- tx := newTransaction(shard, good)
- defer tx.unlockAll()
+ tx := newTx(shard, good, txCatMutator)
err = fn(tx)
if err != nil {
@@ -119,12 +122,15 @@ func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degra
return reported, eg.Wait()
}
-func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error {
+func (m *Manager) Access(ctx context.Context, repo Repository, fn func(Tx) error) error {
shard, err := m.coordinator.FetchShard(ctx, repo)
if err != nil {
return err
}
+ shard.lock.RLock()
+ defer shard.lock.RUnlock()
+
// TODO: some smart caching needs to be done to eliminate this check. We
// already check the shard consistency at the end of each transaction so
// we shouldn't need to do it twice except for the first time we fetch a
@@ -139,8 +145,7 @@ func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx)
return err
}
- tx := newTransaction(shard, good)
- defer tx.unlockAll()
+ tx := newTx(shard, good, txCatAccessor)
err = fn(tx)
if err != nil {
diff --git a/internal/praefect/transaction/mock_test.go b/internal/praefect/transaction/mock_test.go
new file mode 100644
index 000000000..2aab6fcd3
--- /dev/null
+++ b/internal/praefect/transaction/mock_test.go
@@ -0,0 +1,79 @@
+package transaction_test
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction"
+)
+
+type mockCoordinator struct {
+ shards map[transaction.Repository]*transaction.Shard
+}
+
+func (mc mockCoordinator) FetchShard(_ context.Context, repo transaction.Repository) (*transaction.Shard, error) {
+ s, ok := mc.shards[repo]
+ if !ok {
+ return nil, fmt.Errorf("shard doesn't exist for repo %+v", repo)
+ }
+
+ return s, nil
+}
+
+type mockReplMan struct{}
+
+func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil }
+
+type mockNode struct {
+ sync.RWMutex
+
+ tb testing.TB
+ rpc *transaction.RPC
+
+ // set the following values in test cases
+ storage string
+ checksums map[transaction.Repository][]string
+}
+
+func (mn *mockNode) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) {
+ mn.Lock()
+ defer mn.Unlock()
+
+ checksums, ok := mn.checksums[repo]
+ if !ok {
+ panic(
+ fmt.Sprintf(
+ "test setup problem: missing checksums in mock node %s for repo %+v",
+ mn.storage, repo,
+ ),
+ )
+ }
+
+ if len(checksums) < 1 {
+ panic(
+ fmt.Sprintf(
+ "test setup problem: not enough checksums for mock node %s for repo %+v",
+ mn.storage, repo,
+ ),
+ )
+ }
+
+ cs := checksums[0]
+ mn.checksums[repo] = checksums[1:len(checksums)]
+
+ mn.tb.Logf("mock node %s returning checksum %s for repo %s", mn.storage, cs, repo)
+
+ return []byte(cs), nil
+}
+
+func (mn *mockNode) ForwardRPC(_ context.Context, rpc *transaction.RPC) error {
+ mn.Lock()
+ mn.rpc = rpc
+ mn.Unlock()
+
+ return nil
+}
+
+func (mn *mockNode) Storage() string { return mn.storage }
diff --git a/internal/praefect/transaction/shard.go b/internal/praefect/transaction/shard.go
index af6ec2c29..a583c61a7 100644
--- a/internal/praefect/transaction/shard.go
+++ b/internal/praefect/transaction/shard.go
@@ -8,38 +8,27 @@ import (
"golang.org/x/sync/errgroup"
)
-// Shard represents a set of Gitaly replicas for repository. Each shard has a
+// Shard represents a set of Gitaly replicas for a repository. Each shard has a
// designated primary and maintains locks to resources that can cause contention
// between clients writing to the same repo.
type Shard struct {
- repo Repository
-
- primary string // the designated primary node name
-
+ lock *sync.RWMutex // all mutations require a write lock
+ repo Repository // the repo this replica backs
+ primary string // the storage location of the primary node
storageReplicas map[string]Node // maps storage location to a replica
+}
- refLocks struct {
- *sync.RWMutex
- m map[string]*sync.RWMutex // maps ref name to a lock
+func NewShard(r Repository, primary string, replicas []Node) *Shard {
+ sreps := make(map[string]Node)
+ for _, r := range replicas {
+ sreps[r.Storage()] = r
}
- // used to check shard for inconsistencies
- verifier Verifier
-}
-
-func NewShard(r Repository, primary string, replicas []Node, v Verifier) *Shard {
return &Shard{
repo: r,
primary: primary,
- storageReplicas: make(map[string]Node),
- refLocks: struct {
- *sync.RWMutex
- m map[string]*sync.RWMutex
- }{
- RWMutex: new(sync.RWMutex),
- m: make(map[string]*sync.RWMutex),
- },
- verifier: v,
+ storageReplicas: sreps,
+ lock: new(sync.RWMutex),
}
}
@@ -60,7 +49,7 @@ func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, b
eg, eCtx := errgroup.WithContext(ctx)
- for storage, _ := range s.storageReplicas {
+ for storage, node := range s.storageReplicas {
_, ok := omits[storage]
if ok {
continue
@@ -69,7 +58,7 @@ func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, b
storage := storage // rescope iterator vars
eg.Go(func() error {
- cs, err := s.verifier.CheckSum(eCtx, s.repo)
+ cs, err := node.CheckSum(eCtx, s.repo)
if err != nil {
return err
}
diff --git a/internal/praefect/transaction/transaction.go b/internal/praefect/transaction/transaction.go
index 2ddff2180..36bf0c00a 100644
--- a/internal/praefect/transaction/transaction.go
+++ b/internal/praefect/transaction/transaction.go
@@ -2,6 +2,7 @@ package transaction
import (
"context"
+ "errors"
"sync"
)
@@ -25,28 +26,19 @@ type RPC struct {
type Node interface {
Storage() string // storage location the node hosts
ForwardRPC(ctx context.Context, rpc *RPC) error
-
CheckSum(context.Context, Repository) ([]byte, error)
- WriteRef(ctx context.Context, ref, value string) error
}
-// MutateTx represents the resources available during a mutator transaction
-type MutateTx interface {
- AccessTx
- // LockRef acquires a write lock on a reference
- LockRef(context.Context, string) error
- Primary() Node
-}
+type txCategory int
-// AccessTx represents the resources available during an accessor transaction
-type AccessTx interface {
- // Replicas returns all replicas without distinguishing the primary
- Replicas() map[string]Node
- // RLockRef acquires a read lock on a reference
- RLockRef(context.Context, string) error
-}
+const (
+ txCatAccessor = iota
+ txCatMutator
+)
+
+type Tx struct {
+ category txCategory
-type transaction struct {
shard *Shard
replicas map[string]Node // only nodes verified to be consistent
@@ -61,13 +53,14 @@ type transaction struct {
refRollbacks map[string][]byte
}
-func newTransaction(shard *Shard, good []Node) transaction {
+func newTx(shard *Shard, good []Node, txCat txCategory) Tx {
replicas := map[string]Node{}
for _, node := range good {
replicas[node.Storage()] = node
}
- return transaction{
+ return Tx{
+ category: txCat,
shard: shard,
replicas: replicas,
unlocks: struct {
@@ -80,110 +73,23 @@ func newTransaction(shard *Shard, good []Node) transaction {
}
}
-// LockRef attempts to acquire a write lock on the reference name provided.
-// If the context is cancelled first, the lock acquisition will be
-// aborted.
-func (t transaction) LockRef(ctx context.Context, ref string) error {
- lockQ := make(chan *sync.RWMutex)
-
- go func() {
- t.shard.refLocks.RLock()
- l, ok := t.shard.refLocks.m[ref]
- t.shard.refLocks.RUnlock()
-
- if !ok {
- l = new(sync.RWMutex)
- t.shard.refLocks.Lock()
- t.shard.refLocks.m[ref] = l
- t.shard.refLocks.Unlock()
- }
-
- l.Lock()
- lockQ <- l
- }()
-
- // ensure lockQ is consumed in all code paths so that goroutine doesn't
- // stray
- select {
-
- case <-ctx.Done():
- l := <-lockQ
- l.Unlock()
- return ctx.Err()
-
- case l := <-lockQ:
- t.unlocks.Lock()
- t.unlocks.m[ref] = func() { l.Unlock() }
- t.unlocks.Unlock()
-
- return nil
- }
-}
-
-// unlockAll will unlock all acquired locks at the end of a transaction
-func (t transaction) unlockAll() {
- // unlock all refs
- t.unlocks.RLock()
- for _, unlock := range t.unlocks.m {
- unlock()
- }
- t.unlocks.RUnlock()
-}
-
-func (t transaction) rollback(ctx context.Context) error {
+func (t Tx) rollback(ctx context.Context) error {
// for ref, value := range t.refRollbacks {
//
// }
return nil
}
-func (t transaction) Replicas() map[string]Node {
+func (t Tx) Replicas() map[string]Node {
return t.replicas
}
-func (t transaction) Primary() Node {
- return t.replicas[t.shard.primary]
-}
-
-// RLockRef attempts to acquire a read lock on the reference name provided
-// (ref). If the context is cancelled first, the lock acquisition will be
-// aborted.
-func (t transaction) RLockRef(ctx context.Context, ref string) error {
- lockQ := make(chan *sync.RWMutex)
-
- go func() {
- t.shard.refLocks.RLock()
- l, ok := t.shard.refLocks.m[ref]
- t.shard.refLocks.RUnlock()
-
- if !ok {
- l = new(sync.RWMutex)
- t.shard.refLocks.Lock()
- t.shard.refLocks.m[ref] = l
- t.shard.refLocks.Unlock()
- }
-
- l.RLock()
- lockQ <- l
- }()
-
- // ensure lockQ is consumed in all code paths so that goroutine doesn't
- // stray
- select {
-
- case <-ctx.Done():
- // unlock before aborting
- l := <-lockQ
- l.RUnlock()
-
- return ctx.Err()
-
- case l := <-lockQ:
- t.unlocks.Lock()
- t.unlocks.m[ref] = func() { l.RUnlock() }
- t.unlocks.Unlock()
-
- return nil
+var ErrAccessorNotPermitted = errors.New("a mutator operation was attempted by an accessor")
+func (t Tx) Primary() (Node, error) {
+ if t.category != txCatMutator {
+ return nil, ErrAccessorNotPermitted
}
+
+ return t.replicas[t.shard.primary], nil
}
diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go
index 34666f807..ccbbae99d 100644
--- a/internal/praefect/transaction/transaction_test.go
+++ b/internal/praefect/transaction/transaction_test.go
@@ -3,69 +3,113 @@ package transaction_test
import (
"context"
"testing"
+ "time"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transaction"
+
+ "github.com/stretchr/testify/require"
)
-func TestReplMan(t *testing.T) {
- const (
- projA = "project-A"
- stor1 = "storage-1"
- stor2 = "storage-2"
- stor3 = "storage-3"
- )
+const (
+ projA = "project-A"
+ stor1 = "storage-1"
+ stor2 = "storage-2"
+ stor3 = "storage-3"
+)
- mv := &mockVerifier{
- checksums: map[string]map[string][][]byte{
- projA: map[string][][]byte{
- stor1: {[]byte{1}},
- stor2: {[]byte{1}},
- stor3: {[]byte{1}},
- },
- },
+type testCase struct {
+ name string
+ shards map[transaction.Repository]*transaction.Shard
+ txList []struct {
+ mutator bool
+ repo transaction.Repository
+ txFn func(transaction.Tx) error
}
-
- // A transaction manager needs to have the ability to verify the state of
- // replicas, so it needs a Verifier.
- rm := transaction.NewManager(mv, mockCoordinator{}, mockReplMan{})
- rm.Access(context.Background(), transaction.Repository{}, func(transaction.AccessTx) error {
- return nil
- })
+ expectErr error
}
-type mockCoordinator struct{}
+var (
+ repo1 = transaction.Repository{
+ ProjectHash: projA,
+ StorageLoc: stor1,
+ }
-func (_ mockCoordinator) FetchShard(context.Context, transaction.Repository) (*transaction.Shard, error) {
- return nil, nil
-}
+ testCases = []func(testing.TB) testCase{
+ func(t testing.TB) testCase {
+ var (
+ node1 = &mockNode{
+ tb: t,
+ storage: stor1,
+ checksums: map[transaction.Repository][]string{
+ repo1: []string{"1"},
+ },
+ }
+ )
-type mockReplMan struct{}
+ return testCase{
+ name: "one node shard: no-op access tx",
+ shards: map[transaction.Repository]*transaction.Shard{
+ repo1: transaction.NewShard(
+ repo1,
+ node1.storage,
+ []transaction.Node{node1},
+ ),
+ },
+ txList: []struct {
+ mutator bool
+ repo transaction.Repository
+ txFn func(transaction.Tx) error
+ }{
+ {
+ repo: repo1,
+ txFn: func(_ transaction.Tx) error {
+ t.Log("this is a no-op transaction")
+ // checksum should get consumed during consistency
+ // check
+ require.Len(t, node1.checksums[repo1], 0)
-func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil }
+ return nil
+ },
+ },
+ },
+ }
+ },
+ }
+)
-type mockVerifier struct {
- // checksums contains ordered checksums keyed by project and then storage
- checksums map[string]map[string][][]byte
-}
+func TestManager(t *testing.T) {
+ for _, ttFn := range testCases {
+ tt := ttFn(t)
+ t.Run(tt.name, func(t *testing.T) {
+ var (
+ mc = mockCoordinator{tt.shards}
+ mrm = mockReplMan{}
+ rm = transaction.NewManager(mc, mrm)
+ )
-func (mv *mockVerifier) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) {
- storages, ok := mv.checksums[repo.ProjectHash]
- if !ok {
- panic("no project " + repo.ProjectHash)
- }
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
- sums, ok := storages[repo.StorageLoc]
- if !ok {
- panic("no storage " + repo.StorageLoc)
- }
+ var err error
+ for _, tx := range tt.txList {
- if len(sums) < 1 {
- panic("no more checksums for " + repo.ProjectHash)
- }
+ if tx.mutator {
+ err = rm.Mutate(ctx, tx.repo, tx.txFn)
+ } else {
+ err = rm.Access(ctx, tx.repo, tx.txFn)
+ }
- // pop first checksum off list
- var sum []byte
- sum, mv.checksums[repo.ProjectHash][repo.StorageLoc] = sums[len(sums)-1], sums[:len(sums)-1]
+ if err != nil {
+ break
+ }
+ }
- return sum, nil
+ if tt.expectErr != nil {
+ require.Error(t, err)
+ require.EqualError(t, err, tt.expectErr.Error())
+ } else {
+ require.NoError(t, err)
+ }
+ })
+ }
}