Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/transactions/manager.go')
-rw-r--r--internal/praefect/transactions/manager.go74
1 files changed, 12 insertions, 62 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index abfbe8fdc..f84c5198b 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -2,19 +2,17 @@ package transactions
import (
"context"
- cryptorand "crypto/rand"
- "encoding/binary"
- "encoding/hex"
"errors"
"fmt"
- "math/rand"
"sync"
+ "sync/atomic"
"time"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/transaction/voting"
)
var ErrNotFound = errors.New("transaction not found")
@@ -23,7 +21,7 @@ var ErrNotFound = errors.New("transaction not found")
// for Praefect to handle transactions directly instead of having to reach out
// to reference transaction RPCs.
type Manager struct {
- txIDGenerator TransactionIDGenerator
+ idSequence uint64
lock sync.Mutex
transactions map[uint64]*transaction
counterMetric *prometheus.CounterVec
@@ -31,48 +29,10 @@ type Manager struct {
subtransactionsMetric prometheus.Histogram
}
-// TransactionIDGenerator is an interface for types that can generate transaction IDs.
-type TransactionIDGenerator interface {
- // ID generates a new transaction identifier
- ID() uint64
-}
-
-type transactionIDGenerator struct {
- rand *rand.Rand
-}
-
-func newTransactionIDGenerator() *transactionIDGenerator {
- var seed [8]byte
-
- // Ignore any errors. In case we weren't able to generate a seed, the
- // best we can do is to just use the all-zero seed.
- cryptorand.Read(seed[:])
- source := rand.NewSource(int64(binary.LittleEndian.Uint64(seed[:])))
-
- return &transactionIDGenerator{
- rand: rand.New(source),
- }
-}
-
-func (t *transactionIDGenerator) ID() uint64 {
- return rand.Uint64()
-}
-
-// ManagerOpt is a self referential option for Manager
-type ManagerOpt func(*Manager)
-
-// WithTransactionIDGenerator is an option to set the transaction ID generator
-func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt {
- return func(mgr *Manager) {
- mgr.txIDGenerator = generator
- }
-}
-
// NewManager creates a new transactions Manager.
-func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager {
- mgr := &Manager{
- txIDGenerator: newTransactionIDGenerator(),
- transactions: make(map[uint64]*transaction),
+func NewManager(cfg config.Config) *Manager {
+ return &Manager{
+ transactions: make(map[uint64]*transaction),
counterMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "gitaly",
@@ -100,12 +60,6 @@ func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager {
},
),
}
-
- for _, opt := range opts {
- opt(mgr)
- }
-
- return mgr
}
func (mgr *Manager) Describe(descs chan<- *prometheus.Desc) {
@@ -135,11 +89,7 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
mgr.lock.Lock()
defer mgr.lock.Unlock()
- // Use a random transaction ID. Using monotonic incrementing counters
- // that reset on restart of Praefect would be suboptimal, as the chance
- // for collisions is a lot higher in case Praefect restarts when Gitaly
- // nodes still have in-flight transactions.
- transactionID := mgr.txIDGenerator.ID()
+ transactionID := atomic.AddUint64(&mgr.idSequence, 1)
transaction, err := newTransaction(transactionID, voters, threshold)
if err != nil {
@@ -193,7 +143,7 @@ func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *transact
return nil
}
-func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {
+func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, vote voting.Vote) error {
mgr.lock.Lock()
transaction, ok := mgr.transactions[transactionID]
mgr.lock.Unlock()
@@ -202,7 +152,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n
return fmt.Errorf("%w: %d", ErrNotFound, transactionID)
}
- if err := transaction.vote(ctx, node, hash); err != nil {
+ if err := transaction.vote(ctx, node, vote); err != nil {
return err
}
@@ -211,7 +161,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n
// VoteTransaction is called by a client who's casting a vote on a reference
// transaction. It waits until quorum was reached on the given transaction.
-func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error {
+func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, node string, vote voting.Vote) error {
start := time.Now()
defer func() {
delay := time.Since(start)
@@ -221,13 +171,13 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
logger := mgr.log(ctx).WithFields(logrus.Fields{
"transaction.id": transactionID,
"transaction.voter": node,
- "transaction.hash": hex.EncodeToString(hash),
+ "transaction.hash": vote.String(),
})
mgr.counterMetric.WithLabelValues("started").Inc()
logger.Debug("VoteTransaction")
- if err := mgr.voteTransaction(ctx, transactionID, node, hash); err != nil {
+ if err := mgr.voteTransaction(ctx, transactionID, node, vote); err != nil {
var counterLabel string
if errors.Is(err, ErrTransactionStopped) {