diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-17 18:30:18 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-23 12:01:52 +0300 |
commit | 51073bb211020a41e68c4813c4e576120e669767 (patch) | |
tree | dff1652043238642b7a25356bd30e3cba9dcab42 | |
parent | 5a18c4cb74519342307dfabed5e35b77bf8bc5c7 (diff) |
transactions: Split out Transaction interface
The Transaction type is currently implemented as a structure, which
makes it impossible to mock it in other packages. This commit thus
splits it up into a private `transaction` implementation which is the
same as the current `Transaction` implementation, and a new public
`Transaction` interface.
-rw-r--r-- | internal/praefect/coordinator.go | 4 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 8 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 32 |
3 files changed, 27 insertions, 17 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 738bf8e0b..b5f8aaefd 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -322,7 +322,7 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal }, nil, nil, nil), nil } -func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNode, secondaries []RouterNode) (*transactions.Transaction, transactions.CancelFunc, error) { +func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNode, secondaries []RouterNode) (transactions.Transaction, transactions.CancelFunc, error) { var voters []transactions.Voter var threshold uint @@ -649,7 +649,7 @@ func protoMessage(mi protoregistry.MethodInfo, frame []byte) (proto.Message, err func (c *Coordinator) createTransactionFinalizer( ctx context.Context, - transaction *transactions.Transaction, + transaction transactions.Transaction, route RepositoryMutatorRoute, virtualStorage string, targetRepo *gitalypb.Repository, diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 75e73a76a..abfbe8fdc 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -25,7 +25,7 @@ var ErrNotFound = errors.New("transaction not found") type Manager struct { txIDGenerator TransactionIDGenerator lock sync.Mutex - transactions map[uint64]*Transaction + transactions map[uint64]*transaction counterMetric *prometheus.CounterVec delayMetric *prometheus.HistogramVec subtransactionsMetric prometheus.Histogram @@ -72,7 +72,7 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt { func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { mgr := &Manager{ txIDGenerator: newTransactionIDGenerator(), - transactions: make(map[uint64]*Transaction), + transactions: make(map[uint64]*transaction), counterMetric: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "gitaly", @@ -131,7 +131,7 @@ type CancelFunc func() error // taking part in the transaction. `threshold` is the threshold at which an // election will succeed. It needs to be in the range `weight(voters)/2 < // threshold <= weight(voters) to avoid indecidable votes. -func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (*Transaction, CancelFunc, error) { +func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (Transaction, CancelFunc, error) { mgr.lock.Lock() defer mgr.lock.Unlock() @@ -163,7 +163,7 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr }, nil } -func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *Transaction) error { +func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *transaction) error { mgr.lock.Lock() defer mgr.lock.Unlock() diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go index 0c871858d..2545ed4f6 100644 --- a/internal/praefect/transactions/transaction.go +++ b/internal/praefect/transactions/transaction.go @@ -47,11 +47,21 @@ type Voter struct { result VoteResult } -// Transaction is a session where a set of voters votes on one or more +// Transaction is an interface for transactions. +type Transaction interface { + // ID returns the ID of the transaction which uniquely identifies the transaction. + ID() uint64 + // CountSubtransactions counts the number of subtransactions. + CountSubtransactions() int + // State returns the state of each voter part of the transaction. + State() (map[string]VoteResult, error) +} + +// transaction is a session where a set of voters votes on one or more // subtransactions. Subtransactions are a sequence of sessions, where each node // needs to go through the same sequence and agree on the same thing in the end // in order to have the complete transaction succeed. -type Transaction struct { +type transaction struct { id uint64 threshold uint voters []Voter @@ -61,7 +71,7 @@ type Transaction struct { subtransactions []*subtransaction } -func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, error) { +func newTransaction(id uint64, voters []Voter, threshold uint) (*transaction, error) { if len(voters) == 0 { return nil, ErrMissingNodes } @@ -89,7 +99,7 @@ func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, er return nil, ErrInvalidThreshold } - return &Transaction{ + return &transaction{ id: id, threshold: threshold, voters: voters, @@ -97,7 +107,7 @@ func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, er }, nil } -func (t *Transaction) cancel() { +func (t *transaction) cancel() { t.lock.Lock() defer t.lock.Unlock() @@ -108,7 +118,7 @@ func (t *Transaction) cancel() { t.state = transactionCanceled } -func (t *Transaction) stop() error { +func (t *transaction) stop() error { t.lock.Lock() defer t.lock.Unlock() @@ -123,14 +133,14 @@ func (t *Transaction) stop() error { } // ID returns the identifier used to uniquely identify a transaction. -func (t *Transaction) ID() uint64 { +func (t *transaction) ID() uint64 { return t.id } // State returns the voting state mapped by voters. A voting state of `true` // means all subtransactions were successful, a voting state of `false` means // either no subtransactions were created or any of the subtransactions failed. -func (t *Transaction) State() (map[string]VoteResult, error) { +func (t *transaction) State() (map[string]VoteResult, error) { t.lock.Lock() defer t.lock.Unlock() @@ -165,7 +175,7 @@ func (t *Transaction) State() (map[string]VoteResult, error) { // CountSubtransactions counts the number of subtransactions created as part of // the transaction. -func (t *Transaction) CountSubtransactions() int { +func (t *transaction) CountSubtransactions() int { t.lock.Lock() defer t.lock.Unlock() @@ -176,7 +186,7 @@ func (t *Transaction) CountSubtransactions() int { // node hasn't yet voted on or creates a new one if the node has succeeded on // all subtransactions. In case the node has failed on any of the // subtransactions, an error will be returned. -func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) { +func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) { t.lock.Lock() defer t.lock.Unlock() @@ -234,7 +244,7 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e return subtransaction, nil } -func (t *Transaction) vote(ctx context.Context, node string, hash []byte) error { +func (t *transaction) vote(ctx context.Context, node string, hash []byte) error { subtransaction, err := t.getOrCreateSubtransaction(node) if err != nil { return err |