Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/transaction/manager.go')
-rw-r--r--internal/praefect/transaction/manager.go151
1 files changed, 151 insertions, 0 deletions
diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go
new file mode 100644
index 000000000..12a92d610
--- /dev/null
+++ b/internal/praefect/transaction/manager.go
@@ -0,0 +1,151 @@
+package transaction
+
+import (
+ "context"
+ "sync"
+
+ "golang.org/x/sync/errgroup"
+)
+
+// Verifier verifies the project repository state by performing a checksum
+type Verifier interface {
+ // CheckSum will return checksum of all refs for a repo
+ CheckSum(context.Context, Repository) ([]byte, error)
+}
+
+// Coordinator allows the transaction manager to look up the shard for a repo
+// at the beginning of each transaction
+type Coordinator interface {
+ FetchShard(ctx context.Context, repo Repository) (*Shard, error)
+}
+
+// ReplicationManager provides services to handle degraded nodes
+type ReplicationManager interface {
+ NotifyDegradation(context.Context, Repository) error
+}
+
+// Manager tracks the progress of RPCs being applied to multiple
+// downstream servers that make up a shard. It prevents conflicts that may arise
+// from contention between multiple clients trying to modify the same
+// references.
+type Manager struct {
+ mu sync.Mutex
+ shards map[string]*Shard // shards keyed by project
+
+ verifier Verifier
+ coordinator Coordinator
+ replman ReplicationManager
+}
+
+// NewManafer returns a manager for coordinating transaction between shards
+// that are fetched from the coordinator. Any inconsistencies found will be
+// reported to the provided replication manager.
+func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager {
+ return &Manager{
+ shards: map[string]*Shard{},
+ verifier: v,
+ coordinator: c,
+ replman: r,
+ }
+}
+
+// Mutate accepts a closure whose environment is a snapshot of the repository
+// before the transaction begins. It is the responsibility of the closure
+// author to mark all relevant assets being modified during a mutating
+// transaction to ensure they are locked and protected from other closures
+// modifying the same.
+func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error {
+ shard, err := m.coordinator.FetchShard(ctx, repo)
+ if err != nil {
+ return err
+ }
+
+ omits := make(map[string]struct{})
+
+ // TODO: some smart caching needs to be done to eliminate this check. We
+ // already check the shard consistency at the end of each transaction so
+ // we shouldn't need to do it twice except for the first time we fetch a
+ // shard.
+ good, bad, err := shard.validate(ctx, omits)
+ if err != nil {
+ return err
+ }
+
+ // are a majority of the nodes good (consistent)?
+ if len(bad) >= len(good) {
+ return ErrShardInconsistent
+ }
+
+ omits, err = m.notifyDegradations(ctx, repo, bad)
+ if err != nil {
+ return err
+ }
+
+ tx := newTransaction(shard, good)
+ defer tx.unlockAll()
+
+ err = fn(tx)
+ if err != nil {
+ return err
+ }
+
+ // make sure all changes made to replicas are consistent
+ good, bad, err = shard.validate(ctx, omits)
+ if err != nil {
+ return err
+ }
+
+ _, err = m.notifyDegradations(ctx, repo, bad)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degradeds []Node) (map[string]struct{}, error) {
+ reported := make(map[string]struct{})
+
+ eg, eCtx := errgroup.WithContext(ctx)
+ for _, node := range degradeds {
+ node := node // rescope iterator var for goroutine closure
+ reported[node.Storage()] = struct{}{}
+
+ eg.Go(func() error {
+ return m.replman.NotifyDegradation(eCtx, repo)
+ })
+ }
+
+ return reported, eg.Wait()
+}
+
+func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error {
+ shard, err := m.coordinator.FetchShard(ctx, repo)
+ if err != nil {
+ return err
+ }
+
+ // TODO: some smart caching needs to be done to eliminate this check. We
+ // already check the shard consistency at the end of each transaction so
+ // we shouldn't need to do it twice except for the first time we fetch a
+ // shard.
+ good, bad, err := shard.validate(ctx, map[string]struct{}{})
+ if err != nil {
+ return err
+ }
+
+ _, err = m.notifyDegradations(ctx, repo, bad)
+ if err != nil {
+ return err
+ }
+
+ tx := newTransaction(shard, good)
+ defer tx.unlockAll()
+
+ err = fn(tx)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}