Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-03-14 08:29:22 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-03-14 08:29:22 +0300
commit7ac86407d32afd258cb4eb63be0454ded71f404f (patch)
tree3b7b0b2893df873dc7b559998649a14291972665
parentd9ff755e526298a03042d236c578b359802af459 (diff)
split up transaction file for maintainabilitypo_replication_manager
-rw-r--r--internal/praefect/transaction/doc.go10
-rw-r--r--internal/praefect/transaction/manager.go151
-rw-r--r--internal/praefect/transaction/shard.go101
-rw-r--r--internal/praefect/transaction/transaction.go298
-rw-r--r--internal/praefect/transaction/transaction_test.go35
5 files changed, 365 insertions, 230 deletions
diff --git a/internal/praefect/transaction/doc.go b/internal/praefect/transaction/doc.go
new file mode 100644
index 000000000..a90a45668
--- /dev/null
+++ b/internal/praefect/transaction/doc.go
@@ -0,0 +1,10 @@
+/*Package transaction provides transaction management functionality to
+coordinate one-to-many clients attempting to modify the shards concurrently.
+
+While Git is distributed in nature, there are some repository wide data points
+that can conflict between replicas if something goes wrong. This includes
+references, which is why the transaction manager provides an API that allows
+an RPC transaction to read/write lock the references being accessed to prevent
+contention.
+*/
+package transaction
diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go
new file mode 100644
index 000000000..12a92d610
--- /dev/null
+++ b/internal/praefect/transaction/manager.go
@@ -0,0 +1,151 @@
+package transaction
+
+import (
+ "context"
+ "sync"
+
+ "golang.org/x/sync/errgroup"
+)
+
+// Verifier verifies the project repository state by performing a checksum
+type Verifier interface {
+ // CheckSum will return checksum of all refs for a repo
+ CheckSum(context.Context, Repository) ([]byte, error)
+}
+
+// Coordinator allows the transaction manager to look up the shard for a repo
+// at the beginning of each transaction
+type Coordinator interface {
+ FetchShard(ctx context.Context, repo Repository) (*Shard, error)
+}
+
+// ReplicationManager provides services to handle degraded nodes
+type ReplicationManager interface {
+ NotifyDegradation(context.Context, Repository) error
+}
+
+// Manager tracks the progress of RPCs being applied to multiple
+// downstream servers that make up a shard. It prevents conflicts that may arise
+// from contention between multiple clients trying to modify the same
+// references.
+type Manager struct {
+ mu sync.Mutex
+ shards map[string]*Shard // shards keyed by project
+
+ verifier Verifier
+ coordinator Coordinator
+ replman ReplicationManager
+}
+
+// NewManafer returns a manager for coordinating transaction between shards
+// that are fetched from the coordinator. Any inconsistencies found will be
+// reported to the provided replication manager.
+func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager {
+ return &Manager{
+ shards: map[string]*Shard{},
+ verifier: v,
+ coordinator: c,
+ replman: r,
+ }
+}
+
+// Mutate accepts a closure whose environment is a snapshot of the repository
+// before the transaction begins. It is the responsibility of the closure
+// author to mark all relevant assets being modified during a mutating
+// transaction to ensure they are locked and protected from other closures
+// modifying the same.
+func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error {
+ shard, err := m.coordinator.FetchShard(ctx, repo)
+ if err != nil {
+ return err
+ }
+
+ omits := make(map[string]struct{})
+
+ // TODO: some smart caching needs to be done to eliminate this check. We
+ // already check the shard consistency at the end of each transaction so
+ // we shouldn't need to do it twice except for the first time we fetch a
+ // shard.
+ good, bad, err := shard.validate(ctx, omits)
+ if err != nil {
+ return err
+ }
+
+ // are a majority of the nodes good (consistent)?
+ if len(bad) >= len(good) {
+ return ErrShardInconsistent
+ }
+
+ omits, err = m.notifyDegradations(ctx, repo, bad)
+ if err != nil {
+ return err
+ }
+
+ tx := newTransaction(shard, good)
+ defer tx.unlockAll()
+
+ err = fn(tx)
+ if err != nil {
+ return err
+ }
+
+ // make sure all changes made to replicas are consistent
+ good, bad, err = shard.validate(ctx, omits)
+ if err != nil {
+ return err
+ }
+
+ _, err = m.notifyDegradations(ctx, repo, bad)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degradeds []Node) (map[string]struct{}, error) {
+ reported := make(map[string]struct{})
+
+ eg, eCtx := errgroup.WithContext(ctx)
+ for _, node := range degradeds {
+ node := node // rescope iterator var for goroutine closure
+ reported[node.Storage()] = struct{}{}
+
+ eg.Go(func() error {
+ return m.replman.NotifyDegradation(eCtx, repo)
+ })
+ }
+
+ return reported, eg.Wait()
+}
+
+func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error {
+ shard, err := m.coordinator.FetchShard(ctx, repo)
+ if err != nil {
+ return err
+ }
+
+ // TODO: some smart caching needs to be done to eliminate this check. We
+ // already check the shard consistency at the end of each transaction so
+ // we shouldn't need to do it twice except for the first time we fetch a
+ // shard.
+ good, bad, err := shard.validate(ctx, map[string]struct{}{})
+ if err != nil {
+ return err
+ }
+
+ _, err = m.notifyDegradations(ctx, repo, bad)
+ if err != nil {
+ return err
+ }
+
+ tx := newTransaction(shard, good)
+ defer tx.unlockAll()
+
+ err = fn(tx)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/praefect/transaction/shard.go b/internal/praefect/transaction/shard.go
new file mode 100644
index 000000000..af6ec2c29
--- /dev/null
+++ b/internal/praefect/transaction/shard.go
@@ -0,0 +1,101 @@
+package transaction
+
+import (
+ "context"
+ "errors"
+ "sync"
+
+ "golang.org/x/sync/errgroup"
+)
+
+// Shard represents a set of Gitaly replicas for repository. Each shard has a
+// designated primary and maintains locks to resources that can cause contention
+// between clients writing to the same repo.
+type Shard struct {
+ repo Repository
+
+ primary string // the designated primary node name
+
+ storageReplicas map[string]Node // maps storage location to a replica
+
+ refLocks struct {
+ *sync.RWMutex
+ m map[string]*sync.RWMutex // maps ref name to a lock
+ }
+
+ // used to check shard for inconsistencies
+ verifier Verifier
+}
+
+func NewShard(r Repository, primary string, replicas []Node, v Verifier) *Shard {
+ return &Shard{
+ repo: r,
+ primary: primary,
+ storageReplicas: make(map[string]Node),
+ refLocks: struct {
+ *sync.RWMutex
+ m map[string]*sync.RWMutex
+ }{
+ RWMutex: new(sync.RWMutex),
+ m: make(map[string]*sync.RWMutex),
+ },
+ verifier: v,
+ }
+}
+
+// ErrShardInconsistent indicates a mutating operation is unable to be executed
+// due to lack of quorum of consistent nodes.
+var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state")
+
+// validate will concurrently fetch the checksum of each node in the shard and
+// compare against the primary. All replicas consistent with the primary will
+// be returned as "good", and the rest "bad". If only a partial check needs to
+// be done, a list of Node storage keys can be provided to exclude those from
+// the checks.
+func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, bad []Node, err error) {
+ var (
+ mu sync.RWMutex
+ checksums = map[string][]byte{}
+ )
+
+ eg, eCtx := errgroup.WithContext(ctx)
+
+ for storage, _ := range s.storageReplicas {
+ _, ok := omits[storage]
+ if ok {
+ continue
+ }
+
+ storage := storage // rescope iterator vars
+
+ eg.Go(func() error {
+ cs, err := s.verifier.CheckSum(eCtx, s.repo)
+ if err != nil {
+ return err
+ }
+
+ mu.Lock()
+ checksums[storage] = cs
+ mu.Unlock()
+
+ return nil
+ })
+
+ }
+
+ if err := eg.Wait(); err != nil {
+ return nil, nil, err
+ }
+
+ pCS := string(checksums[s.primary])
+ for storage, cs := range checksums {
+ n := s.storageReplicas[storage]
+ if string(cs) != pCS {
+ bad = append(bad, n)
+ continue
+ }
+ good = append(good, n)
+ }
+
+ return good, bad, nil
+}
diff --git a/internal/praefect/transaction/transaction.go b/internal/praefect/transaction/transaction.go
index 9bc49bc24..2ddff2180 100644
--- a/internal/praefect/transaction/transaction.go
+++ b/internal/praefect/transaction/transaction.go
@@ -1,20 +1,8 @@
-/*Package transaction provides transaction management functionality to
-coordinate one-to-many clients attempting to modify the shards concurrently.
-
-While Git is distributed in nature, there are some repository wide data points
-that can conflict between replicas if something goes wrong. This includes
-references, which is why the transaction manager provides an API that allows
-an RPC transaction to read/write lock the references being accessed to prevent
-contention.
-*/
package transaction
import (
"context"
- "errors"
"sync"
-
- "golang.org/x/sync/errgroup"
)
// Repository represents the identity and location of a repository as requested
@@ -24,45 +12,6 @@ type Repository struct {
StorageLoc string // storage location
}
-// Verifier verifies the project repository state
-type Verifier interface {
- // CheckSum will return the checksum for a project in a storage location
- CheckSum(context.Context, Repository) ([]byte, error)
-}
-
-// Coordinator allows the transaction manager to look up the shard for a repo
-// at the beginning of each transaction
-type Coordinator interface {
- FetchShard(ctx context.Context, repo Repository) (*Shard, error)
-}
-
-// ReplicationManager provides services to handle degraded nodes
-type ReplicationManager interface {
- NotifyDegradation(context.Context, Repository, Node) error
-}
-
-// Manager tracks the progress of RPCs being applied to multiple
-// downstream servers that make up a shard. It prevents conflicts that may arise
-// from contention between multiple clients trying to modify the same
-// references.
-type Manager struct {
- mu sync.Mutex
- shards map[string]*Shard // shards keyed by project
-
- verifier Verifier
- coordinator Coordinator
- replman ReplicationManager
-}
-
-func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager {
- return &Manager{
- shards: map[string]*Shard{},
- verifier: v,
- coordinator: c,
- replman: r,
- }
-}
-
// State represents the current state of a backend node
// Note: in the future this may be extended to include refs
type State struct {
@@ -76,70 +25,9 @@ type RPC struct {
type Node interface {
Storage() string // storage location the node hosts
ForwardRPC(ctx context.Context, rpc *RPC) error
-}
-
-// Shard represents
-type Shard struct {
- repo Repository
-
- primary string // the designated primary node name
-
- // Replicas maps a storage location to the node replicas
- storageReplicas map[string]Node
-
- // refLocks coordinates changes between many clients attempting to mutate
- // a reference
- refLocks map[string]*sync.RWMutex
-
- // used to check shard for inconsistencies
- verifier Verifier
-}
-
-// ErrShardInconsistent indicates a mutating operation is unable to be executed
-// due to lack of quorum of consistent nodes.
-var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state")
-
-func (s Shard) validate(ctx context.Context) (good, bad []Node, err error) {
- var (
- mu sync.RWMutex
- checksums = map[string][]byte{}
- )
-
- eg, eCtx := errgroup.WithContext(ctx)
- for storage, _ := range s.storageReplicas {
- storage := storage // rescope iterator vars
-
- eg.Go(func() error {
- cs, err := s.verifier.CheckSum(eCtx, s.repo)
- if err != nil {
- return err
- }
-
- mu.Lock()
- checksums[storage] = cs
- mu.Unlock()
-
- return nil
- })
-
- }
-
- if err := eg.Wait(); err != nil {
- return nil, nil, err
- }
-
- pCS := string(checksums[s.primary])
- for storage, cs := range checksums {
- n := s.storageReplicas[storage]
- if string(cs) != pCS {
- bad = append(bad, n)
- continue
- }
- good = append(good, n)
- }
-
- return good, bad, nil
+ CheckSum(context.Context, Repository) ([]byte, error)
+ WriteRef(ctx context.Context, ref, value string) error
}
// MutateTx represents the resources available during a mutator transaction
@@ -147,7 +35,7 @@ type MutateTx interface {
AccessTx
// LockRef acquires a write lock on a reference
LockRef(context.Context, string) error
- Shard() *Shard
+ Primary() Node
}
// AccessTx represents the resources available during an accessor transaction
@@ -159,20 +47,57 @@ type AccessTx interface {
}
type transaction struct {
- shard *Shard
- refLocks map[string]*sync.RWMutex
+ shard *Shard
+
+ replicas map[string]Node // only nodes verified to be consistent
+
+ // unlocks contains callbacks to unlock all locks acquired
+ unlocks struct {
+ *sync.RWMutex
+ m map[string]func()
+ }
+
+ // refRollbacks contains all reference values before modification
+ refRollbacks map[string][]byte
}
-func (t transaction) Shard() *Shard { return t.shard }
+func newTransaction(shard *Shard, good []Node) transaction {
+ replicas := map[string]Node{}
+ for _, node := range good {
+ replicas[node.Storage()] = node
+ }
-// LockRef attempts to acquire a write lock on the reference name provided
-// (ref). If the context is cancelled first, the lock acquisition will be
+ return transaction{
+ shard: shard,
+ replicas: replicas,
+ unlocks: struct {
+ *sync.RWMutex
+ m map[string]func()
+ }{
+ RWMutex: new(sync.RWMutex),
+ m: make(map[string]func()),
+ },
+ }
+}
+
+// LockRef attempts to acquire a write lock on the reference name provided.
+// If the context is cancelled first, the lock acquisition will be
// aborted.
func (t transaction) LockRef(ctx context.Context, ref string) error {
lockQ := make(chan *sync.RWMutex)
go func() {
- l := t.shard.refLocks[ref]
+ t.shard.refLocks.RLock()
+ l, ok := t.shard.refLocks.m[ref]
+ t.shard.refLocks.RUnlock()
+
+ if !ok {
+ l = new(sync.RWMutex)
+ t.shard.refLocks.Lock()
+ t.shard.refLocks.m[ref] = l
+ t.shard.refLocks.Unlock()
+ }
+
l.Lock()
lockQ <- l
}()
@@ -182,40 +107,62 @@ func (t transaction) LockRef(ctx context.Context, ref string) error {
select {
case <-ctx.Done():
- // unlock before aborting
l := <-lockQ
l.Unlock()
-
return ctx.Err()
case l := <-lockQ:
- t.refLocks[ref] = l
+ t.unlocks.Lock()
+ t.unlocks.m[ref] = func() { l.Unlock() }
+ t.unlocks.Unlock()
+ return nil
}
-
- return nil
}
// unlockAll will unlock all acquired locks at the end of a transaction
func (t transaction) unlockAll() {
// unlock all refs
- for _, rl := range t.refLocks {
- rl.Unlock()
+ t.unlocks.RLock()
+ for _, unlock := range t.unlocks.m {
+ unlock()
}
+ t.unlocks.RUnlock()
+}
+
+func (t transaction) rollback(ctx context.Context) error {
+ // for ref, value := range t.refRollbacks {
+ //
+ // }
+ return nil
}
func (t transaction) Replicas() map[string]Node {
- return t.shard.storageReplicas
+ return t.replicas
+}
+
+func (t transaction) Primary() Node {
+ return t.replicas[t.shard.primary]
}
// RLockRef attempts to acquire a read lock on the reference name provided
-// (ref). If the context is cancelled first, the lock acquisition will be
+// (ref). If the context is cancelled first, the lock acquisition will be
// aborted.
func (t transaction) RLockRef(ctx context.Context, ref string) error {
lockQ := make(chan *sync.RWMutex)
go func() {
- l := t.shard.refLocks[ref]
+ t.shard.refLocks.RLock()
+ l, ok := t.shard.refLocks.m[ref]
+ t.shard.refLocks.RUnlock()
+
+ if !ok {
+ l = new(sync.RWMutex)
+ t.shard.refLocks.Lock()
+ t.shard.refLocks.m[ref] = l
+ t.shard.refLocks.Unlock()
+ }
+
l.RLock()
lockQ <- l
}()
@@ -232,92 +179,11 @@ func (t transaction) RLockRef(ctx context.Context, ref string) error {
return ctx.Err()
case l := <-lockQ:
- t.refLocks[ref] = l
-
- }
-
- return nil
-}
-
-// Mutate accepts a closure whose environment is a snapshot of the repository
-// before the transaction begins. It is the responsibility of the closure
-// author to mark all relevant assets being modified during a mutating
-// transaction to ensure they are locked and protected from other closures
-// modifying the same.
-func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error {
- shard, err := m.coordinator.FetchShard(ctx, repo)
- if err != nil {
- return err
- }
-
- good, bad, err := shard.validate(ctx)
- if err != nil {
- return err
- }
-
- // are a majority of the nodes good (consistent)?
- if len(bad) >= len(good) {
- return ErrShardInconsistent
- }
-
- eg, eCtx := errgroup.WithContext(ctx)
- for _, node := range bad {
- node := node // rescope iterator var for goroutine closure
+ t.unlocks.Lock()
+ t.unlocks.m[ref] = func() { l.RUnlock() }
+ t.unlocks.Unlock()
- eg.Go(func() error {
- return m.replman.NotifyDegradation(eCtx, repo, node)
- })
- }
-
- tx := transaction{
- shard: shard,
- refLocks: map[string]*sync.RWMutex{},
- }
- defer tx.unlockAll()
-
- err = fn(tx)
- if err != nil {
- return err
- }
+ return nil
- return nil
-}
-
-func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error {
- shard, err := m.coordinator.FetchShard(ctx, repo)
- if err != nil {
- return err
}
-
- good, bad, err := shard.validate(ctx)
- if err != nil {
- return err
- }
-
- // are a majority of the nodes good (consistent)?
- if len(bad) >= len(good) {
- return ErrShardInconsistent
- }
-
- eg, eCtx := errgroup.WithContext(ctx)
- for _, node := range bad {
- node := node // rescope iterator var for goroutine closure
-
- eg.Go(func() error {
- return m.replman.NotifyDegradation(eCtx, repo, node)
- })
- }
-
- tx := transaction{
- shard: shard,
- refLocks: map[string]*sync.RWMutex{},
- }
- defer tx.unlockAll()
-
- err = fn(tx)
- if err != nil {
- return err
- }
-
- return nil
}
diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go
index ca7064e41..34666f807 100644
--- a/internal/praefect/transaction/transaction_test.go
+++ b/internal/praefect/transaction/transaction_test.go
@@ -4,7 +4,7 @@ import (
"context"
"testing"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction"
)
func TestReplMan(t *testing.T) {
@@ -25,40 +25,47 @@ func TestReplMan(t *testing.T) {
},
}
- // A replication manager needs to have the ability to verify the state of
+ // A transaction manager needs to have the ability to verify the state of
// replicas, so it needs a Verifier.
- rm := praefect.NewReplicationManager(mv)
+ rm := transaction.NewManager(mv, mockCoordinator{}, mockReplMan{})
+ rm.Access(context.Background(), transaction.Repository{}, func(transaction.AccessTx) error {
+ return nil
+ })
+}
- // replication managers are typically used within the context of a request
- // when a mutator RPC is received.
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+type mockCoordinator struct{}
+func (_ mockCoordinator) FetchShard(context.Context, transaction.Repository) (*transaction.Shard, error) {
+ return nil, nil
}
+type mockReplMan struct{}
+
+func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil }
+
type mockVerifier struct {
// checksums contains ordered checksums keyed by project and then storage
checksums map[string]map[string][][]byte
}
-func (mv *mockVerifier) CheckSum(_ context.Context, project, storage string) ([]byte, error) {
- storages, ok := mv.checksums[project]
+func (mv *mockVerifier) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) {
+ storages, ok := mv.checksums[repo.ProjectHash]
if !ok {
- panic("no project " + project)
+ panic("no project " + repo.ProjectHash)
}
- sums, ok := storages[storage]
+ sums, ok := storages[repo.StorageLoc]
if !ok {
- panic("no storage " + storage)
+ panic("no storage " + repo.StorageLoc)
}
if len(sums) < 1 {
- panic("no more checksums for " + project)
+ panic("no more checksums for " + repo.ProjectHash)
}
// pop first checksum off list
var sum []byte
- sum, mv.checksums[project][storage] = sums[len(sums)-1], sums[:len(sums)-1]
+ sum, mv.checksums[repo.ProjectHash][repo.StorageLoc] = sums[len(sums)-1], sums[:len(sums)-1]
return sum, nil
}