diff options
Diffstat (limited to 'internal/praefect/transaction/manager.go')
-rw-r--r-- | internal/praefect/transaction/manager.go | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/internal/praefect/transaction/manager.go b/internal/praefect/transaction/manager.go index 12a92d610..7fe38442e 100644 --- a/internal/praefect/transaction/manager.go +++ b/internal/praefect/transaction/manager.go @@ -32,7 +32,6 @@ type Manager struct { mu sync.Mutex shards map[string]*Shard // shards keyed by project - verifier Verifier coordinator Coordinator replman ReplicationManager } @@ -40,10 +39,9 @@ type Manager struct { // NewManafer returns a manager for coordinating transaction between shards // that are fetched from the coordinator. Any inconsistencies found will be // reported to the provided replication manager. -func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { +func NewManager(c Coordinator, r ReplicationManager) *Manager { return &Manager{ shards: map[string]*Shard{}, - verifier: v, coordinator: c, replman: r, } @@ -54,12 +52,18 @@ func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager { // author to mark all relevant assets being modified during a mutating // transaction to ensure they are locked and protected from other closures // modifying the same. -func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error { +func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(Tx) error) error { shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } + // Prevent other clients from modifying the same shard until finished. + // TODO: serialize access at the reference scope: + // https://gitlab.com/gitlab-org/gitaly/issues/1530 + shard.lock.Lock() + defer shard.lock.Unlock() + omits := make(map[string]struct{}) // TODO: some smart caching needs to be done to eliminate this check. We @@ -81,8 +85,7 @@ func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) return err } - tx := newTransaction(shard, good) - defer tx.unlockAll() + tx := newTx(shard, good, txCatMutator) err = fn(tx) if err != nil { @@ -119,12 +122,15 @@ func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degra return reported, eg.Wait() } -func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error { +func (m *Manager) Access(ctx context.Context, repo Repository, fn func(Tx) error) error { shard, err := m.coordinator.FetchShard(ctx, repo) if err != nil { return err } + shard.lock.RLock() + defer shard.lock.RUnlock() + // TODO: some smart caching needs to be done to eliminate this check. We // already check the shard consistency at the end of each transaction so // we shouldn't need to do it twice except for the first time we fetch a @@ -139,8 +145,7 @@ func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) return err } - tx := newTransaction(shard, good) - defer tx.unlockAll() + tx := newTx(shard, good, txCatAccessor) err = fn(tx) if err != nil { |