diff options
Diffstat (limited to 'internal/praefect/transactions/manager.go')
-rw-r--r-- | internal/praefect/transactions/manager.go | 74 |
1 files changed, 12 insertions, 62 deletions
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index abfbe8fdc..f84c5198b 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -2,19 +2,17 @@ package transactions import ( "context" - cryptorand "crypto/rand" - "encoding/binary" - "encoding/hex" "errors" "fmt" - "math/rand" "sync" + "sync/atomic" "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/transaction/voting" ) var ErrNotFound = errors.New("transaction not found") @@ -23,7 +21,7 @@ var ErrNotFound = errors.New("transaction not found") // for Praefect to handle transactions directly instead of having to reach out // to reference transaction RPCs. type Manager struct { - txIDGenerator TransactionIDGenerator + idSequence uint64 lock sync.Mutex transactions map[uint64]*transaction counterMetric *prometheus.CounterVec @@ -31,48 +29,10 @@ type Manager struct { subtransactionsMetric prometheus.Histogram } -// TransactionIDGenerator is an interface for types that can generate transaction IDs. -type TransactionIDGenerator interface { - // ID generates a new transaction identifier - ID() uint64 -} - -type transactionIDGenerator struct { - rand *rand.Rand -} - -func newTransactionIDGenerator() *transactionIDGenerator { - var seed [8]byte - - // Ignore any errors. In case we weren't able to generate a seed, the - // best we can do is to just use the all-zero seed. - cryptorand.Read(seed[:]) - source := rand.NewSource(int64(binary.LittleEndian.Uint64(seed[:]))) - - return &transactionIDGenerator{ - rand: rand.New(source), - } -} - -func (t *transactionIDGenerator) ID() uint64 { - return rand.Uint64() -} - -// ManagerOpt is a self referential option for Manager -type ManagerOpt func(*Manager) - -// WithTransactionIDGenerator is an option to set the transaction ID generator -func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt { - return func(mgr *Manager) { - mgr.txIDGenerator = generator - } -} - // NewManager creates a new transactions Manager. -func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { - mgr := &Manager{ - txIDGenerator: newTransactionIDGenerator(), - transactions: make(map[uint64]*transaction), +func NewManager(cfg config.Config) *Manager { + return &Manager{ + transactions: make(map[uint64]*transaction), counterMetric: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "gitaly", @@ -100,12 +60,6 @@ func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { }, ), } - - for _, opt := range opts { - opt(mgr) - } - - return mgr } func (mgr *Manager) Describe(descs chan<- *prometheus.Desc) { @@ -135,11 +89,7 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr mgr.lock.Lock() defer mgr.lock.Unlock() - // Use a random transaction ID. Using monotonic incrementing counters - // that reset on restart of Praefect would be suboptimal, as the chance - // for collisions is a lot higher in case Praefect restarts when Gitaly - // nodes still have in-flight transactions. - transactionID := mgr.txIDGenerator.ID() + transactionID := atomic.AddUint64(&mgr.idSequence, 1) transaction, err := newTransaction(transactionID, voters, threshold) if err != nil { @@ -193,7 +143,7 @@ func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *transact return nil } -func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error { +func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, node string, vote voting.Vote) error { mgr.lock.Lock() transaction, ok := mgr.transactions[transactionID] mgr.lock.Unlock() @@ -202,7 +152,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n return fmt.Errorf("%w: %d", ErrNotFound, transactionID) } - if err := transaction.vote(ctx, node, hash); err != nil { + if err := transaction.vote(ctx, node, vote); err != nil { return err } @@ -211,7 +161,7 @@ func (mgr *Manager) voteTransaction(ctx context.Context, transactionID uint64, n // VoteTransaction is called by a client who's casting a vote on a reference // transaction. It waits until quorum was reached on the given transaction. -func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, node string, hash []byte) error { +func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, node string, vote voting.Vote) error { start := time.Now() defer func() { delay := time.Since(start) @@ -221,13 +171,13 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n logger := mgr.log(ctx).WithFields(logrus.Fields{ "transaction.id": transactionID, "transaction.voter": node, - "transaction.hash": hex.EncodeToString(hash), + "transaction.hash": vote.String(), }) mgr.counterMetric.WithLabelValues("started").Inc() logger.Debug("VoteTransaction") - if err := mgr.voteTransaction(ctx, transactionID, node, hash); err != nil { + if err := mgr.voteTransaction(ctx, transactionID, node, vote); err != nil { var counterLabel string if errors.Is(err, ErrTransactionStopped) { |