Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-17 18:30:18 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-23 12:01:52 +0300
commit51073bb211020a41e68c4813c4e576120e669767 (patch)
treedff1652043238642b7a25356bd30e3cba9dcab42
parent5a18c4cb74519342307dfabed5e35b77bf8bc5c7 (diff)
transactions: Split out Transaction interface
The Transaction type is currently implemented as a structure, which makes it impossible to mock it in other packages. This commit thus splits it up into a private `transaction` implementation which is the same as the current `Transaction` implementation, and a new public `Transaction` interface.
-rw-r--r--internal/praefect/coordinator.go4
-rw-r--r--internal/praefect/transactions/manager.go8
-rw-r--r--internal/praefect/transactions/transaction.go32
3 files changed, 27 insertions, 17 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 738bf8e0b..b5f8aaefd 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -322,7 +322,7 @@ func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCal
}, nil, nil, nil), nil
}
-func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNode, secondaries []RouterNode) (*transactions.Transaction, transactions.CancelFunc, error) {
+func (c *Coordinator) registerTransaction(ctx context.Context, primary RouterNode, secondaries []RouterNode) (transactions.Transaction, transactions.CancelFunc, error) {
var voters []transactions.Voter
var threshold uint
@@ -649,7 +649,7 @@ func protoMessage(mi protoregistry.MethodInfo, frame []byte) (proto.Message, err
func (c *Coordinator) createTransactionFinalizer(
ctx context.Context,
- transaction *transactions.Transaction,
+ transaction transactions.Transaction,
route RepositoryMutatorRoute,
virtualStorage string,
targetRepo *gitalypb.Repository,
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 75e73a76a..abfbe8fdc 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -25,7 +25,7 @@ var ErrNotFound = errors.New("transaction not found")
type Manager struct {
txIDGenerator TransactionIDGenerator
lock sync.Mutex
- transactions map[uint64]*Transaction
+ transactions map[uint64]*transaction
counterMetric *prometheus.CounterVec
delayMetric *prometheus.HistogramVec
subtransactionsMetric prometheus.Histogram
@@ -72,7 +72,7 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt {
func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager {
mgr := &Manager{
txIDGenerator: newTransactionIDGenerator(),
- transactions: make(map[uint64]*Transaction),
+ transactions: make(map[uint64]*transaction),
counterMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "gitaly",
@@ -131,7 +131,7 @@ type CancelFunc func() error
// taking part in the transaction. `threshold` is the threshold at which an
// election will succeed. It needs to be in the range `weight(voters)/2 <
// threshold <= weight(voters) to avoid indecidable votes.
-func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (*Transaction, CancelFunc, error) {
+func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, threshold uint) (Transaction, CancelFunc, error) {
mgr.lock.Lock()
defer mgr.lock.Unlock()
@@ -163,7 +163,7 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
}, nil
}
-func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *Transaction) error {
+func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *transaction) error {
mgr.lock.Lock()
defer mgr.lock.Unlock()
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
index 0c871858d..2545ed4f6 100644
--- a/internal/praefect/transactions/transaction.go
+++ b/internal/praefect/transactions/transaction.go
@@ -47,11 +47,21 @@ type Voter struct {
result VoteResult
}
-// Transaction is a session where a set of voters votes on one or more
+// Transaction is an interface for transactions.
+type Transaction interface {
+ // ID returns the ID of the transaction which uniquely identifies the transaction.
+ ID() uint64
+ // CountSubtransactions counts the number of subtransactions.
+ CountSubtransactions() int
+ // State returns the state of each voter part of the transaction.
+ State() (map[string]VoteResult, error)
+}
+
+// transaction is a session where a set of voters votes on one or more
// subtransactions. Subtransactions are a sequence of sessions, where each node
// needs to go through the same sequence and agree on the same thing in the end
// in order to have the complete transaction succeed.
-type Transaction struct {
+type transaction struct {
id uint64
threshold uint
voters []Voter
@@ -61,7 +71,7 @@ type Transaction struct {
subtransactions []*subtransaction
}
-func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, error) {
+func newTransaction(id uint64, voters []Voter, threshold uint) (*transaction, error) {
if len(voters) == 0 {
return nil, ErrMissingNodes
}
@@ -89,7 +99,7 @@ func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, er
return nil, ErrInvalidThreshold
}
- return &Transaction{
+ return &transaction{
id: id,
threshold: threshold,
voters: voters,
@@ -97,7 +107,7 @@ func newTransaction(id uint64, voters []Voter, threshold uint) (*Transaction, er
}, nil
}
-func (t *Transaction) cancel() {
+func (t *transaction) cancel() {
t.lock.Lock()
defer t.lock.Unlock()
@@ -108,7 +118,7 @@ func (t *Transaction) cancel() {
t.state = transactionCanceled
}
-func (t *Transaction) stop() error {
+func (t *transaction) stop() error {
t.lock.Lock()
defer t.lock.Unlock()
@@ -123,14 +133,14 @@ func (t *Transaction) stop() error {
}
// ID returns the identifier used to uniquely identify a transaction.
-func (t *Transaction) ID() uint64 {
+func (t *transaction) ID() uint64 {
return t.id
}
// State returns the voting state mapped by voters. A voting state of `true`
// means all subtransactions were successful, a voting state of `false` means
// either no subtransactions were created or any of the subtransactions failed.
-func (t *Transaction) State() (map[string]VoteResult, error) {
+func (t *transaction) State() (map[string]VoteResult, error) {
t.lock.Lock()
defer t.lock.Unlock()
@@ -165,7 +175,7 @@ func (t *Transaction) State() (map[string]VoteResult, error) {
// CountSubtransactions counts the number of subtransactions created as part of
// the transaction.
-func (t *Transaction) CountSubtransactions() int {
+func (t *transaction) CountSubtransactions() int {
t.lock.Lock()
defer t.lock.Unlock()
@@ -176,7 +186,7 @@ func (t *Transaction) CountSubtransactions() int {
// node hasn't yet voted on or creates a new one if the node has succeeded on
// all subtransactions. In case the node has failed on any of the
// subtransactions, an error will be returned.
-func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
+func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
t.lock.Lock()
defer t.lock.Unlock()
@@ -234,7 +244,7 @@ func (t *Transaction) getOrCreateSubtransaction(node string) (*subtransaction, e
return subtransaction, nil
}
-func (t *Transaction) vote(ctx context.Context, node string, hash []byte) error {
+func (t *transaction) vote(ctx context.Context, node string, hash []byte) error {
subtransaction, err := t.getOrCreateSubtransaction(node)
if err != nil {
return err