Welcome to mirror list, hosted at ThFree Co, Russian Federation.

transaction.go « transactions « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: c875fcad2aaf02de685dbbc0144de3d729a37511 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
package transactions

import (
	"context"
	"errors"
	"sync"

	"gitlab.com/gitlab-org/gitaly/v15/internal/transaction/voting"
)

var (
	// ErrDuplicateNodes indicates a transaction was registered with two
	// voters having the same name.
	ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes")
	// ErrMissingNodes indicates a transaction was registered with no voters.
	ErrMissingNodes = errors.New("transaction requires at least one node")
	// ErrInvalidThreshold indicates a transaction was registered with an
	// invalid threshold that may either allow for multiple different
	// quorums or none at all.
	ErrInvalidThreshold = errors.New("transaction has invalid threshold")

	// ErrTransactionFailed indicates the transaction didn't reach quorum.
	ErrTransactionFailed = errors.New("transaction did not reach quorum")
	// ErrTransactionCanceled indicates the transaction was canceled before
	// reaching quorum.
	ErrTransactionCanceled = errors.New("transaction has been canceled")
	// ErrTransactionStopped indicates the transaction was gracefully stopped.
	ErrTransactionStopped = errors.New("transaction has been stopped")
)

type transactionState int

const (
	transactionOpen = transactionState(iota)
	transactionCanceled
	transactionStopped
)

// Voter is a participant in a given transaction that may cast a vote.
type Voter struct {
	// Name of the voter, usually Gitaly's storage name.
	Name string
	// Votes is the number of votes available to this voter in the voting
	// process. `0` means the outcome of the vote will not be influenced by
	// this voter.
	Votes uint

	vote   *voting.Vote
	result VoteResult
}

// Transaction is an interface for transactions.
type Transaction interface {
	// ID returns the ID of the transaction which uniquely identifies the transaction.
	ID() uint64
	// CountSubtransactions counts the number of subtransactions.
	CountSubtransactions() int
	// State returns the state of each voter part of the transaction.
	State() (map[string]VoteResult, error)
	// DidVote returns whether the given node has cast a vote.
	DidVote(string) bool
}

// transaction is a session where a set of voters votes on one or more
// subtransactions. Subtransactions are a sequence of sessions, where each node
// needs to go through the same sequence and agree on the same thing in the end
// in order to have the complete transaction succeed.
type transaction struct {
	id        uint64
	threshold uint
	voters    []Voter

	lock            sync.Mutex
	state           transactionState
	subtransactions []*subtransaction
}

func newTransaction(id uint64, voters []Voter, threshold uint) (*transaction, error) {
	if len(voters) == 0 {
		return nil, ErrMissingNodes
	}

	var totalVotes uint
	votersByNode := make(map[string]interface{}, len(voters))
	for _, voter := range voters {
		if _, ok := votersByNode[voter.Name]; ok {
			return nil, ErrDuplicateNodes
		}
		votersByNode[voter.Name] = nil
		totalVotes += voter.Votes
	}

	// If the given threshold is smaller than the total votes, then we
	// cannot ever reach quorum.
	if totalVotes < threshold {
		return nil, ErrInvalidThreshold
	}

	// If the threshold is less or equal than half of all node's votes,
	// it's possible to reach multiple different quorums that settle on
	// different outcomes.
	if threshold*2 <= totalVotes {
		return nil, ErrInvalidThreshold
	}

	return &transaction{
		id:        id,
		threshold: threshold,
		voters:    voters,
		state:     transactionOpen,
	}, nil
}

func (t *transaction) cancel() {
	t.lock.Lock()
	defer t.lock.Unlock()

	for _, subtransaction := range t.subtransactions {
		subtransaction.cancel()
	}

	t.state = transactionCanceled
}

func (t *transaction) stop() error {
	t.lock.Lock()
	defer t.lock.Unlock()

	for _, subtransaction := range t.subtransactions {
		if err := subtransaction.stop(); err != nil {
			return err
		}
	}
	t.state = transactionStopped

	return nil
}

// ID returns the identifier used to uniquely identify a transaction.
func (t *transaction) ID() uint64 {
	return t.id
}

// State returns the voting state mapped by voters. A voting state of `true`
// means all subtransactions were successful, a voting state of `false` means
// either no subtransactions were created or any of the subtransactions failed.
func (t *transaction) State() (map[string]VoteResult, error) {
	t.lock.Lock()
	defer t.lock.Unlock()

	results := make(map[string]VoteResult, len(t.voters))

	if len(t.subtransactions) == 0 {
		for _, voter := range t.voters {
			switch t.state {
			case transactionOpen:
				results[voter.Name] = VoteUndecided
			case transactionCanceled:
				results[voter.Name] = VoteCanceled
			case transactionStopped:
				results[voter.Name] = VoteStopped
			default:
				return nil, errors.New("invalid transaction state")
			}
		}
		return results, nil
	}

	// Collect voter results. Given that all subtransactions are created with all voters
	// registered in the transaction, we can simply take results from the last subtransaction.
	// Any nodes which didn't yet cast a vote in the last transaction will be in the default
	// undecided state.
	for voter, result := range t.subtransactions[len(t.subtransactions)-1].state() {
		results[voter] = result
	}

	return results, nil
}

// CountSubtransactions counts the number of subtransactions created as part of
// the transaction.
func (t *transaction) CountSubtransactions() int {
	t.lock.Lock()
	defer t.lock.Unlock()

	return len(t.subtransactions)
}

// DidVote determines whether the given node did cast a vote. If it's not possible to retrieve the
// vote, then the node by definition didn't cast a vote.
func (t *transaction) DidVote(node string) bool {
	t.lock.Lock()
	defer t.lock.Unlock()

	// If there are no subtransactions, then no vote could've been cast by the given node.
	if len(t.subtransactions) == 0 {
		return false
	}

	// It's sufficient to take a look at the first transaction.
	vote, err := t.subtransactions[0].getVote(node)
	if err != nil {
		// If it's not possible to retrieve the vote, then we consider the note to not have
		// cast a vote.
		return false
	}

	return vote != nil
}

// getOrCreateSubtransaction gets an ongoing subtransaction on which the given
// node hasn't yet voted on or creates a new one if the node has succeeded on
// all subtransactions. In case the node has failed on any of the
// subtransactions, an error will be returned.
func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
	t.lock.Lock()
	defer t.lock.Unlock()

	switch t.state {
	case transactionOpen:
		// expected state, nothing to do
	case transactionCanceled:
		return nil, ErrTransactionCanceled
	case transactionStopped:
		return nil, ErrTransactionStopped
	default:
		return nil, errors.New("invalid transaction state")
	}

	// Check for pending subtransactions on the specified node.
	if subtransactions, err := t.getPendingNodeSubtransactions(node); err != nil {
		return nil, err
	} else if len(subtransactions) != 0 {
		// First pending subtransaction is the next in queue for processing.
		return subtransactions[0], nil
	}

	// If we arrive here, then we know that all the node has voted and
	// reached quorum on all subtransactions. We can thus create a new one.
	subtransaction, err := t.createSubtransaction()
	if err != nil {
		return nil, err
	}

	t.subtransactions = append(t.subtransactions, subtransaction)

	return subtransaction, nil
}

// getPendingNodeSubtransactions returns all undecided subtransactions
// for the specified voter. `nil` is returned if there are no pending
// subtransactions for the node.
func (t *transaction) getPendingNodeSubtransactions(node string) ([]*subtransaction, error) {
	for i, subtransaction := range t.subtransactions {
		result, err := subtransaction.getResult(node)
		if err != nil {
			return nil, err
		}

		switch result {
		case VoteUndecided:
			// Nodes after first undecided voter will also be undecided.
			// Remaining subtransactions are returned.
			return t.subtransactions[i:], nil
		case VoteCommitted:
			// If we have committed this subtransaction, we're good
			// to go.
			continue
		case VoteFailed:
			// If a vote was cast on a subtransaction which failed
			// to reach majority, then we cannot proceed with any
			// subsequent votes anymore.
			return nil, ErrTransactionFailed
		case VoteCanceled:
			// If the subtransaction was aborted, then we need to
			// fail as we cannot proceed if the path leading to the
			// end result has intermittent failures.
			return nil, ErrTransactionCanceled
		case VoteStopped:
			// If the transaction was stopped, then we need to fail
			// with a graceful error.
			return nil, ErrTransactionStopped
		}
	}

	return nil, nil
}

// createSubtransaction returns a new subtransaction with any previously
// canceled voter results propagated into the new subtransaction. Once a
// voter has been canceled it can no longer vote in the current and all
// future subtransactions. Propagating canceled voter state ensures
// subtransactions do not wait for an impossible quorum due to canceled
// voters.
func (t *transaction) createSubtransaction() (*subtransaction, error) {
	// If there are no subtransactions propagation can be skipped.
	if len(t.subtransactions) == 0 {
		return newSubtransaction(t.voters, t.threshold)
	}

	// Propagate canceled voters from previous subtransaction.
	prevSub := t.subtransactions[len(t.subtransactions)-1]
	propagatedVoters, err := prevSub.getPropagatedVoters(t.voters)
	if err != nil {
		return nil, err
	}

	return newSubtransaction(propagatedVoters, t.threshold)
}

func (t *transaction) vote(ctx context.Context, node string, vote voting.Vote) error {
	subtransaction, err := t.getOrCreateSubtransaction(node)
	if err != nil {
		return err
	}

	if err := subtransaction.vote(node, vote); err != nil {
		return err
	}

	return subtransaction.collectVotes(ctx, node)
}

// cancelNodeVoter cancels the undecided voters associated with
// the specified node for all pending subtransactions.
func (t *transaction) cancelNodeVoter(node string) error {
	t.lock.Lock()
	defer t.lock.Unlock()

	// Get all undecided subtransactions for node.
	pendingSubtransactions, err := t.getPendingNodeSubtransactions(node)
	if err != nil {
		return err
	}

	// If there are no pending subtransactions a new one should
	// be created and added to the transaction so the failure
	// can be tracked.
	if len(pendingSubtransactions) == 0 {
		sub, err := t.createSubtransaction()
		if err != nil {
			return err
		}

		t.subtransactions = append(t.subtransactions, sub)
		pendingSubtransactions = []*subtransaction{sub}
	}

	// Cancel node voters in undecided subtransactions.
	for _, subtransaction := range pendingSubtransactions {
		err := subtransaction.cancelNodeVoter(node)
		if err != nil {
			return err
		}
	}

	return nil
}