diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-20 08:53:51 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2020-05-20 08:53:51 +0300 |
commit | 69df3cb43559ba55acde01b8ebeb253946b9d11f (patch) | |
tree | 3003394fab7a22dbac41d8dc61fada40f6b936ca /internal/praefect/transactions | |
parent | 1023368a99d58b9e228e03240dc431ed8d084b41 (diff) |
transactions: Extract transaction structure
Right now, transactions are still kind of simple in that they only allow
for single-node voting. As we're about to extend their scope to
multi-node voting, the associated code is going to grow more complex. As
a preparatory step, we thus extract the transaction-related part into
its own structure in order to nicely encapsulate it and not let the
transaction manager itself grow more complex.
Diffstat (limited to 'internal/praefect/transactions')
-rw-r--r-- | internal/praefect/transactions/manager.go | 36 | ||||
-rw-r--r-- | internal/praefect/transactions/transaction.go | 39 |
2 files changed, 52 insertions, 23 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 01bc73a12..d9a5a354f 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -3,11 +3,9 @@ package transactions import ( "context" cryptorand "crypto/rand" - "crypto/sha1" "encoding/binary" "encoding/hex" "errors" - "fmt" "math/rand" "sync" "time" @@ -26,7 +24,7 @@ var ErrNotFound = errors.New("transaction not found") type Manager struct { txIdGenerator TransactionIdGenerator lock sync.Mutex - transactions map[uint64]string + transactions map[uint64]*transaction counterMetric *prometheus.CounterVec delayMetric metrics.HistogramVec } @@ -86,7 +84,7 @@ func WithTransactionIdGenerator(generator TransactionIdGenerator) ManagerOpt { func NewManager(opts ...ManagerOpt) *Manager { mgr := &Manager{ txIdGenerator: newTransactionIdGenerator(), - transactions: make(map[uint64]string), + transactions: make(map[uint64]*transaction), counterMetric: prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"action"}), delayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"action"}), } @@ -113,22 +111,21 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, nodes []string) (ui mgr.lock.Lock() defer mgr.lock.Unlock() - // We only accept a single node in transactions right now, which is - // usually the primary. This limitation will be lifted at a later point - // to allow for real transaction voting and multi-phase commits. - if len(nodes) != 1 { - return 0, nil, errors.New("transaction requires exactly one node") - } - // Use a random transaction ID. Using monotonic incrementing counters // that reset on restart of Praefect would be suboptimal, as the chance // for collisions is a lot higher in case Praefect restarts when Gitaly // nodes still have in-flight transactions. transactionID := mgr.txIdGenerator.Id() + + transaction, err := newTransaction(nodes) + if err != nil { + return 0, nil, err + } + if _, ok := mgr.transactions[transactionID]; ok { return 0, nil, errors.New("transaction exists already") } - mgr.transactions[transactionID] = nodes[0] + mgr.transactions[transactionID] = transaction mgr.log(ctx).WithFields(logrus.Fields{ "transaction_id": transactionID, @@ -148,14 +145,7 @@ func (mgr *Manager) cancelTransaction(transactionID uint64) { delete(mgr.transactions, transactionID) } -func (mgr *Manager) verifyTransaction(transactionID uint64, node string, hash []byte) error { - // While the reference updates hash is not used yet, we already verify - // it's there. At a later point, the hash will be used to verify that - // all voting nodes agree on the same updates. - if len(hash) != sha1.Size { - return fmt.Errorf("invalid reference hash: %q", hash) - } - +func (mgr *Manager) voteTransaction(transactionID uint64, node string, hash []byte) error { mgr.lock.Lock() transaction, ok := mgr.transactions[transactionID] mgr.lock.Unlock() @@ -164,8 +154,8 @@ func (mgr *Manager) verifyTransaction(transactionID uint64, node string, hash [] return ErrNotFound } - if transaction != node { - return fmt.Errorf("invalid node for transaction: %q", node) + if err := transaction.vote(node, hash); err != nil { + return err } return nil @@ -192,7 +182,7 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n "hash": hex.EncodeToString(hash), }).Debug("VoteTransaction") - if err := mgr.verifyTransaction(transactionID, node, hash); err != nil { + if err := mgr.voteTransaction(transactionID, node, hash); err != nil { mgr.log(ctx).WithFields(logrus.Fields{ "transaction_id": transactionID, "node": node, diff --git a/internal/praefect/transactions/transaction.go b/internal/praefect/transactions/transaction.go new file mode 100644 index 000000000..431ac2f11 --- /dev/null +++ b/internal/praefect/transactions/transaction.go @@ -0,0 +1,39 @@ +package transactions + +import ( + "crypto/sha1" + "errors" + "fmt" +) + +type transaction struct { + node string +} + +func newTransaction(nodes []string) (*transaction, error) { + // We only accept a single node in transactions right now, which is + // usually the primary. This limitation will be lifted at a later point + // to allow for real transaction voting and multi-phase commits. + if len(nodes) != 1 { + return nil, errors.New("transaction requires exactly one node") + } + + return &transaction{ + node: nodes[0], + }, nil +} + +func (t *transaction) vote(node string, hash []byte) error { + // While the reference updates hash is not used yet, we already verify + // it's there. At a later point, the hash will be used to verify that + // all voting nodes agree on the same updates. + if len(hash) != sha1.Size { + return fmt.Errorf("invalid reference hash: %q", hash) + } + + if t.node != node { + return fmt.Errorf("invalid node for transaction: %q", node) + } + + return nil +} |