diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-03-14 08:29:22 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-03-14 08:29:22 +0300 |
commit | 7ac86407d32afd258cb4eb63be0454ded71f404f (patch) | |
tree | 3b7b0b2893df873dc7b559998649a14291972665 | |
parent | d9ff755e526298a03042d236c578b359802af459 (diff) |
split up transaction file for maintainabilitypo_replication_manager
-rw-r--r-- | internal/praefect/transaction/doc.go | 10 | ||||
-rw-r--r-- | internal/praefect/transaction/manager.go | 151 | ||||
-rw-r--r-- | internal/praefect/transaction/shard.go | 101 | ||||
-rw-r--r-- | internal/praefect/transaction/transaction.go | 298 | ||||
-rw-r--r-- | internal/praefect/transaction/transaction_test.go | 35 |
5 files changed, 365 insertions, 230 deletions
diff --git a/internal/praefect/transaction/doc.go b/internal/praefect/transaction/doc.go new file mode 100644 index 000000000..a90a45668 --- /dev/null +++ b/internal/praefect/transaction/doc.go @@ -0,0 +1,10 @@ +/*Package transaction provides transaction management functionality to +coordinate one-to-many clients attempting to modify the shards concurrently. + +While Git is distributed in nature, there are some repository wide data points +that can conflict between replicas if something goes wrong. This includes +references, which is why the transaction manager provides an API that allows +an RPC transaction to read/write lock the references being accessed to prevent +contention. +*/ +package transaction diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go new file mode 100644 index 000000000..12a92d610 --- /dev/null +++ b/internal/praefect/transaction/manager.go @@ -0,0 +1,151 @@ +package transaction + +import ( + "context" + "sync" + + "golang.org/x/sync/errgroup" +) + +// Verifier verifies the project repository state by performing a checksum +type Verifier interface { + // CheckSum will return checksum of all refs for a repo + CheckSum(context.Context, Repository) ([]byte, error) +} + +// Coordinator allows the transaction manager to look up the shard for a repo +// at the beginning of each transaction +type Coordinator interface { + FetchShard(ctx context.Context, repo Repository) (*Shard, error) +} + +// ReplicationManager provides services to handle degraded nodes +type ReplicationManager interface { + NotifyDegradation(context.Context, Repository) error +} + +// Manager tracks the progress of RPCs being applied to multiple +// downstream servers that make up a shard. It prevents conflicts that may arise +// from contention between multiple clients trying to modify the same +// references. +type Manager struct { + mu sync.Mutex + shards map[string]*Shard // shards keyed by project + + verifier Verifier + coordinator Coordinator + replman ReplicationManager +} + +// NewManafer returns a manager for coordinating transaction between shards +// that are fetched from the coordinator. Any inconsistencies found will be +// reported to the provided replication manager. +func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { + return &Manager{ + shards: map[string]*Shard{}, + verifier: v, + coordinator: c, + replman: r, + } +} + +// Mutate accepts a closure whose environment is a snapshot of the repository +// before the transaction begins. It is the responsibility of the closure +// author to mark all relevant assets being modified during a mutating +// transaction to ensure they are locked and protected from other closures +// modifying the same. +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + omits := make(map[string]struct{}) + + // TODO: some smart caching needs to be done to eliminate this check. We + // already check the shard consistency at the end of each transaction so + // we shouldn't need to do it twice except for the first time we fetch a + // shard. + good, bad, err := shard.validate(ctx, omits) + if err != nil { + return err + } + + // are a majority of the nodes good (consistent)? + if len(bad) >= len(good) { + return ErrShardInconsistent + } + + omits, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + tx := newTransaction(shard, good) + defer tx.unlockAll() + + err = fn(tx) + if err != nil { + return err + } + + // make sure all changes made to replicas are consistent + good, bad, err = shard.validate(ctx, omits) + if err != nil { + return err + } + + _, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + return nil +} + +func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degradeds []Node) (map[string]struct{}, error) { + reported := make(map[string]struct{}) + + eg, eCtx := errgroup.WithContext(ctx) + for _, node := range degradeds { + node := node // rescope iterator var for goroutine closure + reported[node.Storage()] = struct{}{} + + eg.Go(func() error { + return m.replman.NotifyDegradation(eCtx, repo) + }) + } + + return reported, eg.Wait() +} + +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { + shard, err := m.coordinator.FetchShard(ctx, repo) + if err != nil { + return err + } + + // TODO: some smart caching needs to be done to eliminate this check. We + // already check the shard consistency at the end of each transaction so + // we shouldn't need to do it twice except for the first time we fetch a + // shard. + good, bad, err := shard.validate(ctx, map[string]struct{}{}) + if err != nil { + return err + } + + _, err = m.notifyDegradations(ctx, repo, bad) + if err != nil { + return err + } + + tx := newTransaction(shard, good) + defer tx.unlockAll() + + err = fn(tx) + if err != nil { + return err + } + + return nil +} diff --git a/internal/praefect/transaction/shard.go b/internal/praefect/transaction/shard.go new file mode 100644 index 000000000..af6ec2c29 --- /dev/null +++ b/internal/praefect/transaction/shard.go @@ -0,0 +1,101 @@ +package transaction + +import ( + "context" + "errors" + "sync" + + "golang.org/x/sync/errgroup" +) + +// Shard represents a set of Gitaly replicas for repository. Each shard has a +// designated primary and maintains locks to resources that can cause contention +// between clients writing to the same repo. +type Shard struct { + repo Repository + + primary string // the designated primary node name + + storageReplicas map[string]Node // maps storage location to a replica + + refLocks struct { + *sync.RWMutex + m map[string]*sync.RWMutex // maps ref name to a lock + } + + // used to check shard for inconsistencies + verifier Verifier +} + +func NewShard(r Repository, primary string, replicas []Node, v Verifier) *Shard { + return &Shard{ + repo: r, + primary: primary, + storageReplicas: make(map[string]Node), + refLocks: struct { + *sync.RWMutex + m map[string]*sync.RWMutex + }{ + RWMutex: new(sync.RWMutex), + m: make(map[string]*sync.RWMutex), + }, + verifier: v, + } +} + +// ErrShardInconsistent indicates a mutating operation is unable to be executed +// due to lack of quorum of consistent nodes. +var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state") + +// validate will concurrently fetch the checksum of each node in the shard and +// compare against the primary. All replicas consistent with the primary will +// be returned as "good", and the rest "bad". If only a partial check needs to +// be done, a list of Node storage keys can be provided to exclude those from +// the checks. +func (s Shard) validate(ctx context.Context, omits map[string]struct{}) (good, bad []Node, err error) { + var ( + mu sync.RWMutex + checksums = map[string][]byte{} + ) + + eg, eCtx := errgroup.WithContext(ctx) + + for storage, _ := range s.storageReplicas { + _, ok := omits[storage] + if ok { + continue + } + + storage := storage // rescope iterator vars + + eg.Go(func() error { + cs, err := s.verifier.CheckSum(eCtx, s.repo) + if err != nil { + return err + } + + mu.Lock() + checksums[storage] = cs + mu.Unlock() + + return nil + }) + + } + + if err := eg.Wait(); err != nil { + return nil, nil, err + } + + pCS := string(checksums[s.primary]) + for storage, cs := range checksums { + n := s.storageReplicas[storage] + if string(cs) != pCS { + bad = append(bad, n) + continue + } + good = append(good, n) + } + + return good, bad, nil +} diff --git a/internal/praefect/transaction/transaction.go b/internal/praefect/transaction/transaction.go index 9bc49bc24..2ddff2180 100644 --- a/internal/praefect/transaction/transaction.go +++ b/internal/praefect/transaction/transaction.go @@ -1,20 +1,8 @@ -/*Package transaction provides transaction management functionality to -coordinate one-to-many clients attempting to modify the shards concurrently. - -While Git is distributed in nature, there are some repository wide data points -that can conflict between replicas if something goes wrong. This includes -references, which is why the transaction manager provides an API that allows -an RPC transaction to read/write lock the references being accessed to prevent -contention. -*/ package transaction import ( "context" - "errors" "sync" - - "golang.org/x/sync/errgroup" ) // Repository represents the identity and location of a repository as requested @@ -24,45 +12,6 @@ type Repository struct { StorageLoc string // storage location } -// Verifier verifies the project repository state -type Verifier interface { - // CheckSum will return the checksum for a project in a storage location - CheckSum(context.Context, Repository) ([]byte, error) -} - -// Coordinator allows the transaction manager to look up the shard for a repo -// at the beginning of each transaction -type Coordinator interface { - FetchShard(ctx context.Context, repo Repository) (*Shard, error) -} - -// ReplicationManager provides services to handle degraded nodes -type ReplicationManager interface { - NotifyDegradation(context.Context, Repository, Node) error -} - -// Manager tracks the progress of RPCs being applied to multiple -// downstream servers that make up a shard. It prevents conflicts that may arise -// from contention between multiple clients trying to modify the same -// references. -type Manager struct { - mu sync.Mutex - shards map[string]*Shard // shards keyed by project - - verifier Verifier - coordinator Coordinator - replman ReplicationManager -} - -func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { - return &Manager{ - shards: map[string]*Shard{}, - verifier: v, - coordinator: c, - replman: r, - } -} - // State represents the current state of a backend node // Note: in the future this may be extended to include refs type State struct { @@ -76,70 +25,9 @@ type RPC struct { type Node interface { Storage() string // storage location the node hosts ForwardRPC(ctx context.Context, rpc *RPC) error -} - -// Shard represents -type Shard struct { - repo Repository - - primary string // the designated primary node name - - // Replicas maps a storage location to the node replicas - storageReplicas map[string]Node - - // refLocks coordinates changes between many clients attempting to mutate - // a reference - refLocks map[string]*sync.RWMutex - - // used to check shard for inconsistencies - verifier Verifier -} - -// ErrShardInconsistent indicates a mutating operation is unable to be executed -// due to lack of quorum of consistent nodes. -var ErrShardInconsistent = errors.New("majority of shard nodes are in inconsistent state") - -func (s Shard) validate(ctx context.Context) (good, bad []Node, err error) { - var ( - mu sync.RWMutex - checksums = map[string][]byte{} - ) - - eg, eCtx := errgroup.WithContext(ctx) - for storage, _ := range s.storageReplicas { - storage := storage // rescope iterator vars - - eg.Go(func() error { - cs, err := s.verifier.CheckSum(eCtx, s.repo) - if err != nil { - return err - } - - mu.Lock() - checksums[storage] = cs - mu.Unlock() - - return nil - }) - - } - - if err := eg.Wait(); err != nil { - return nil, nil, err - } - - pCS := string(checksums[s.primary]) - for storage, cs := range checksums { - n := s.storageReplicas[storage] - if string(cs) != pCS { - bad = append(bad, n) - continue - } - good = append(good, n) - } - - return good, bad, nil + CheckSum(context.Context, Repository) ([]byte, error) + WriteRef(ctx context.Context, ref, value string) error } // MutateTx represents the resources available during a mutator transaction @@ -147,7 +35,7 @@ type MutateTx interface { AccessTx // LockRef acquires a write lock on a reference LockRef(context.Context, string) error - Shard() *Shard + Primary() Node } // AccessTx represents the resources available during an accessor transaction @@ -159,20 +47,57 @@ type AccessTx interface { } type transaction struct { - shard *Shard - refLocks map[string]*sync.RWMutex + shard *Shard + + replicas map[string]Node // only nodes verified to be consistent + + // unlocks contains callbacks to unlock all locks acquired + unlocks struct { + *sync.RWMutex + m map[string]func() + } + + // refRollbacks contains all reference values before modification + refRollbacks map[string][]byte } -func (t transaction) Shard() *Shard { return t.shard } +func newTransaction(shard *Shard, good []Node) transaction { + replicas := map[string]Node{} + for _, node := range good { + replicas[node.Storage()] = node + } -// LockRef attempts to acquire a write lock on the reference name provided -// (ref). If the context is cancelled first, the lock acquisition will be + return transaction{ + shard: shard, + replicas: replicas, + unlocks: struct { + *sync.RWMutex + m map[string]func() + }{ + RWMutex: new(sync.RWMutex), + m: make(map[string]func()), + }, + } +} + +// LockRef attempts to acquire a write lock on the reference name provided. +// If the context is cancelled first, the lock acquisition will be // aborted. func (t transaction) LockRef(ctx context.Context, ref string) error { lockQ := make(chan *sync.RWMutex) go func() { - l := t.shard.refLocks[ref] + t.shard.refLocks.RLock() + l, ok := t.shard.refLocks.m[ref] + t.shard.refLocks.RUnlock() + + if !ok { + l = new(sync.RWMutex) + t.shard.refLocks.Lock() + t.shard.refLocks.m[ref] = l + t.shard.refLocks.Unlock() + } + l.Lock() lockQ <- l }() @@ -182,40 +107,62 @@ func (t transaction) LockRef(ctx context.Context, ref string) error { select { case <-ctx.Done(): - // unlock before aborting l := <-lockQ l.Unlock() - return ctx.Err() case l := <-lockQ: - t.refLocks[ref] = l + t.unlocks.Lock() + t.unlocks.m[ref] = func() { l.Unlock() } + t.unlocks.Unlock() + return nil } - - return nil } // unlockAll will unlock all acquired locks at the end of a transaction func (t transaction) unlockAll() { // unlock all refs - for _, rl := range t.refLocks { - rl.Unlock() + t.unlocks.RLock() + for _, unlock := range t.unlocks.m { + unlock() } + t.unlocks.RUnlock() +} + +func (t transaction) rollback(ctx context.Context) error { + // for ref, value := range t.refRollbacks { + // + // } + return nil } func (t transaction) Replicas() map[string]Node { - return t.shard.storageReplicas + return t.replicas +} + +func (t transaction) Primary() Node { + return t.replicas[t.shard.primary] } // RLockRef attempts to acquire a read lock on the reference name provided -// (ref). If the context is cancelled first, the lock acquisition will be +// (ref). If the context is cancelled first, the lock acquisition will be // aborted. func (t transaction) RLockRef(ctx context.Context, ref string) error { lockQ := make(chan *sync.RWMutex) go func() { - l := t.shard.refLocks[ref] + t.shard.refLocks.RLock() + l, ok := t.shard.refLocks.m[ref] + t.shard.refLocks.RUnlock() + + if !ok { + l = new(sync.RWMutex) + t.shard.refLocks.Lock() + t.shard.refLocks.m[ref] = l + t.shard.refLocks.Unlock() + } + l.RLock() lockQ <- l }() @@ -232,92 +179,11 @@ func (t transaction) RLockRef(ctx context.Context, ref string) error { return ctx.Err() case l := <-lockQ: - t.refLocks[ref] = l - - } - - return nil -} - -// Mutate accepts a closure whose environment is a snapshot of the repository -// before the transaction begins. It is the responsibility of the closure -// author to mark all relevant assets being modified during a mutating -// transaction to ensure they are locked and protected from other closures -// modifying the same. -func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { - shard, err := m.coordinator.FetchShard(ctx, repo) - if err != nil { - return err - } - - good, bad, err := shard.validate(ctx) - if err != nil { - return err - } - - // are a majority of the nodes good (consistent)? - if len(bad) >= len(good) { - return ErrShardInconsistent - } - - eg, eCtx := errgroup.WithContext(ctx) - for _, node := range bad { - node := node // rescope iterator var for goroutine closure + t.unlocks.Lock() + t.unlocks.m[ref] = func() { l.RUnlock() } + t.unlocks.Unlock() - eg.Go(func() error { - return m.replman.NotifyDegradation(eCtx, repo, node) - }) - } - - tx := transaction{ - shard: shard, - refLocks: map[string]*sync.RWMutex{}, - } - defer tx.unlockAll() - - err = fn(tx) - if err != nil { - return err - } + return nil - return nil -} - -func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { - shard, err := m.coordinator.FetchShard(ctx, repo) - if err != nil { - return err } - - good, bad, err := shard.validate(ctx) - if err != nil { - return err - } - - // are a majority of the nodes good (consistent)? - if len(bad) >= len(good) { - return ErrShardInconsistent - } - - eg, eCtx := errgroup.WithContext(ctx) - for _, node := range bad { - node := node // rescope iterator var for goroutine closure - - eg.Go(func() error { - return m.replman.NotifyDegradation(eCtx, repo, node) - }) - } - - tx := transaction{ - shard: shard, - refLocks: map[string]*sync.RWMutex{}, - } - defer tx.unlockAll() - - err = fn(tx) - if err != nil { - return err - } - - return nil } diff --git a/internal/praefect/transaction/transaction_test.go b/internal/praefect/transaction/transaction_test.go index ca7064e41..34666f807 100644 --- a/internal/praefect/transaction/transaction_test.go +++ b/internal/praefect/transaction/transaction_test.go @@ -4,7 +4,7 @@ import ( "context" "testing" - "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/transaction" ) func TestReplMan(t *testing.T) { @@ -25,40 +25,47 @@ func TestReplMan(t *testing.T) { }, } - // A replication manager needs to have the ability to verify the state of + // A transaction manager needs to have the ability to verify the state of // replicas, so it needs a Verifier. - rm := praefect.NewReplicationManager(mv) + rm := transaction.NewManager(mv, mockCoordinator{}, mockReplMan{}) + rm.Access(context.Background(), transaction.Repository{}, func(transaction.AccessTx) error { + return nil + }) +} - // replication managers are typically used within the context of a request - // when a mutator RPC is received. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +type mockCoordinator struct{} +func (_ mockCoordinator) FetchShard(context.Context, transaction.Repository) (*transaction.Shard, error) { + return nil, nil } +type mockReplMan struct{} + +func (_ mockReplMan) NotifyDegradation(context.Context, transaction.Repository) error { return nil } + type mockVerifier struct { // checksums contains ordered checksums keyed by project and then storage checksums map[string]map[string][][]byte } -func (mv *mockVerifier) CheckSum(_ context.Context, project, storage string) ([]byte, error) { - storages, ok := mv.checksums[project] +func (mv *mockVerifier) CheckSum(_ context.Context, repo transaction.Repository) ([]byte, error) { + storages, ok := mv.checksums[repo.ProjectHash] if !ok { - panic("no project " + project) + panic("no project " + repo.ProjectHash) } - sums, ok := storages[storage] + sums, ok := storages[repo.StorageLoc] if !ok { - panic("no storage " + storage) + panic("no storage " + repo.StorageLoc) } if len(sums) < 1 { - panic("no more checksums for " + project) + panic("no more checksums for " + repo.ProjectHash) } // pop first checksum off list var sum []byte - sum, mv.checksums[project][storage] = sums[len(sums)-1], sums[:len(sums)-1] + sum, mv.checksums[repo.ProjectHash][repo.StorageLoc] = sums[len(sums)-1], sums[:len(sums)-1] return sum, nil } |