Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-20 08:53:51 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-05-20 08:53:51 +0300
commit69df3cb43559ba55acde01b8ebeb253946b9d11f (patch)
tree3003394fab7a22dbac41d8dc61fada40f6b936ca
parent1023368a99d58b9e228e03240dc431ed8d084b41 (diff)
transactions: Extract transaction structure
Right now, transactions are still kind of simple in that they only allow for single-node voting. As we're about to extend their scope to multi-node voting, the associated code is going to grow more complex. As a preparatory step, we thus extract the transaction-related part into its own structure in order to nicely encapsulate it and not let the transaction manager itself grow more complex.
-rw-r--r--internal/praefect/transactions/manager.go36
-rw-r--r--internal/praefect/transactions/transaction.go39
2 files changed, 52 insertions, 23 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 01bc73a12..d9a5a354f 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -3,11 +3,9 @@ package transactions
import (
"context"
cryptorand "crypto/rand"
- "crypto/sha1"
"encoding/binary"
"encoding/hex"
"errors"
- "fmt"
"math/rand"
"sync"
"time"
@@ -26,7 +24,7 @@ var ErrNotFound = errors.New("transaction not found")
type Manager struct {
txIdGenerator TransactionIdGenerator
lock sync.Mutex
- transactions map[uint64]string
+ transactions map[uint64]*transaction
counterMetric *prometheus.CounterVec
delayMetric metrics.HistogramVec
}
@@ -86,7 +84,7 @@ func WithTransactionIdGenerator(generator TransactionIdGenerator) ManagerOpt {
func NewManager(opts ...ManagerOpt) *Manager {
mgr := &Manager{
txIdGenerator: newTransactionIdGenerator(),
- transactions: make(map[uint64]string),
+ transactions: make(map[uint64]*transaction),
counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}),
delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}),
}
@@ -113,22 +111,21 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, nodes []string) (ui
mgr.lock.Lock()
defer mgr.lock.Unlock()
- // We only accept a single node in transactions right now, which is
- // usually the primary. This limitation will be lifted at a later point
- // to allow for real transaction voting and multi-phase commits.
- if len(nodes) != 1 {
- return 0, nil, errors.New("transaction requires exactly one node")
- }
-
// Use a random transaction ID. Using monotonic incrementing counters
// that reset on restart of Praefect would be suboptimal, as the chance
// for collisions is a lot higher in case Praefect restarts when Gitaly
// nodes still have in-flight transactions.
transactionID := mgr.txIdGenerator.Id()
+
+ transaction, err := newTransaction(nodes)
+ if err != nil {
+ return 0, nil, err
+ }
+
if _, ok := mgr.transactions[transactionID]; ok {
return 0, nil, errors.New("transaction exists already")
}
- mgr.transactions[transactionID] = nodes[0]
+ mgr.transactions[transactionID] = transaction
mgr.log(ctx).WithFields(logrus.Fields{
"transaction_id": transactionID,
@@ -148,14 +145,7 @@ func (mgr *Manager) cancelTransaction(transactionID uint64) {
delete(mgr.transactions, transactionID)
}
-func (mgr *Manager) verifyTransaction(transactionID uint64, node string, hash []byte) error {
- // While the reference updates hash is not used yet, we already verify
- // it's there. At a later point, the hash will be used to verify that
- // all voting nodes agree on the same updates.
- if len(hash) != sha1.Size {
- return fmt.Errorf("invalid reference hash: %q", hash)
- }
-
+func (mgr *Manager) voteTransaction(transactionID uint64, node string, hash []byte) error {
mgr.lock.Lock()
transaction, ok := mgr.transactions[transactionID]
mgr.lock.Unlock()
@@ -164,8 +154,8 @@ func (mgr *Manager) verifyTransaction(transactionID uint64, node string, hash []
return ErrNotFound
}
- if transaction != node {
- return fmt.Errorf("invalid node for transaction: %q", node)
+ if err := transaction.vote(node, hash); err != nil {
+ return err
}
return nil
@@ -192,7 +182,7 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
"hash": hex.EncodeToString(hash),
}).Debug("VoteTransaction")
- if err := mgr.verifyTransaction(transactionID, node, hash); err != nil {
+ if err := mgr.voteTransaction(transactionID, node, hash); err != nil {
mgr.log(ctx).WithFields(logrus.Fields{
"transaction_id": transactionID,
"node": node,
diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go
new file mode 100644
index 000000000..431ac2f11
--- /dev/null
+++ b/internal/praefect/transactions/transaction.go
@@ -0,0 +1,39 @@
+package transactions
+
+import (
+ "crypto/sha1"
+ "errors"
+ "fmt"
+)
+
+type transaction struct {
+ node string
+}
+
+func newTransaction(nodes []string) (*transaction, error) {
+ // We only accept a single node in transactions right now, which is
+ // usually the primary. This limitation will be lifted at a later point
+ // to allow for real transaction voting and multi-phase commits.
+ if len(nodes) != 1 {
+ return nil, errors.New("transaction requires exactly one node")
+ }
+
+ return &transaction{
+ node: nodes[0],
+ }, nil
+}
+
+func (t *transaction) vote(node string, hash []byte) error {
+ // While the reference updates hash is not used yet, we already verify
+ // it's there. At a later point, the hash will be used to verify that
+ // all voting nodes agree on the same updates.
+ if len(hash) != sha1.Size {
+ return fmt.Errorf("invalid reference hash: %q", hash)
+ }
+
+ if t.node != node {
+ return fmt.Errorf("invalid node for transaction: %q", node)
+ }
+
+ return nil
+}