Welcome to mirror list, hosted at ThFree Co, Russian Federation.

subtransaction.go « transactions « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 2ce44bff2a46c791159ff055174e72874810b1f1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
package transactions

import (
	"context"
	"errors"
	"fmt"
	"sync"

	"gitlab.com/gitlab-org/gitaly/internal/transaction/voting"
)

// VoteResult represents the outcome of a transaction for a single voter.
type VoteResult int

const (
	// VoteUndecided means that the voter either didn't yet show up or that
	// the vote couldn't yet be decided due to there being no majority yet.
	VoteUndecided VoteResult = iota
	// VoteCommitted means that the voter committed his vote.
	VoteCommitted
	// VoteFailed means that the voter has failed the vote because a
	// majority of nodes has elected a different result.
	VoteFailed
	// VoteCanceled means that the transaction was cancelled.
	VoteCanceled
	// VoteStopped means that the transaction was gracefully stopped.
	VoteStopped
)

// subtransaction is a single session where voters are voting for a certain outcome.
type subtransaction struct {
	doneCh chan interface{}

	threshold uint

	lock         sync.RWMutex
	votersByNode map[string]*Voter
	voteCounts   map[voting.Vote]uint
}

func newSubtransaction(voters []Voter, threshold uint) (*subtransaction, error) {
	votersByNode := make(map[string]*Voter, len(voters))
	for _, voter := range voters {
		voter := voter // rescope loop variable
		votersByNode[voter.Name] = &voter
	}

	return &subtransaction{
		doneCh:       make(chan interface{}),
		threshold:    threshold,
		votersByNode: votersByNode,
		voteCounts:   make(map[voting.Vote]uint, len(voters)),
	}, nil
}

func (t *subtransaction) cancel() {
	t.lock.Lock()
	defer t.lock.Unlock()

	for _, voter := range t.votersByNode {
		// If a voter didn't yet show up or is still undecided, we need
		// to mark it as failed so it won't get the idea of committing
		// the transaction at a later point anymore.
		if voter.result == VoteUndecided {
			voter.result = VoteCanceled
		}
	}

	if !t.isDone() {
		close(t.doneCh)
	}
}

func (t *subtransaction) stop() error {
	t.lock.Lock()
	defer t.lock.Unlock()

	for _, voter := range t.votersByNode {
		switch voter.result {
		case VoteCanceled:
			// If the vote was canceled already, we cannot stop it.
			return ErrTransactionCanceled
		case VoteStopped:
			// Similar if the vote was stopped already.
			return ErrTransactionStopped
		case VoteUndecided:
			// Undecided voters will get stopped, ...
			voter.result = VoteStopped
		case VoteCommitted, VoteFailed:
			// ... while decided voters cannot be changed anymore.
			continue
		}
	}

	if !t.isDone() {
		close(t.doneCh)
	}

	return nil
}

func (t *subtransaction) state() map[string]VoteResult {
	t.lock.Lock()
	defer t.lock.Unlock()

	results := make(map[string]VoteResult, len(t.votersByNode))
	for node, voter := range t.votersByNode {
		results[node] = voter.result
	}

	return results
}

func (t *subtransaction) vote(node string, vote voting.Vote) error {
	t.lock.Lock()
	defer t.lock.Unlock()

	// Cast our vote. In case the node doesn't exist or has already cast a
	// vote, we need to abort.
	voter, ok := t.votersByNode[node]
	if !ok {
		return fmt.Errorf("invalid node for transaction: %q", node)
	}
	if voter.vote != nil {
		return fmt.Errorf("node already cast a vote: %q", node)
	}

	switch voter.result {
	case VoteUndecided:
		// Happy case, we can still cast a vote.
		break
	case VoteCanceled:
		return ErrTransactionCanceled
	case VoteStopped:
		return ErrTransactionStopped
	default:
		// Because we didn't vote yet, we know that the node cannot be
		// either in VoteCommitted or VoteFailed state.
		return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node)
	}

	voter.vote = &vote

	t.voteCounts[vote] += voter.Votes

	// Update voter states to reflect the new vote counts. Before quorum is
	// reached, this function will check whether the threshold was reached
	// and, if so, update all voters which have already cast a vote. After
	// quorum was reached, it will only update the currently voting node.
	t.updateVoterStates()

	if t.mustSignalVoters() {
		close(t.doneCh)
	}

	return nil
}

// updateVoterStates updates undecided voters. Voters are updated either as
// soon as quorum was reached or alternatively when all votes were cast.
func (t *subtransaction) updateVoterStates() {
	var majorityVote *voting.Vote
	for v, voteCount := range t.voteCounts {
		if voteCount >= t.threshold {
			v := v
			majorityVote = &v
			break
		}
	}

	allVotesCast := true
	for _, voter := range t.votersByNode {
		if voter.vote == nil {
			allVotesCast = false
			break
		}
	}

	// We need to adjust voter states either when quorum was reached or
	// when all votes were cast. If all votes were cast without reaching
	// quorum, we set all voters into VoteFailed state.
	if majorityVote == nil && !allVotesCast {
		return
	}

	// Update all voters which have cast a vote and which are not
	// undecided. We mustn't change any voters which did decide on an
	// outcome already as they may have already committed or aborted their
	// action.
	for _, voter := range t.votersByNode {
		if voter.result != VoteUndecided {
			continue
		}

		if voter.vote == nil || majorityVote == nil {
			if allVotesCast {
				voter.result = VoteFailed
			}
			continue
		}

		if *voter.vote == *majorityVote {
			voter.result = VoteCommitted
		} else {
			voter.result = VoteFailed
		}
	}
}

// mustSignalVoters determines whether we need to signal voters. Signalling may
// only happen once, so we need to make sure that either we just crossed the
// threshold or that nobody else did and no more votes are missing.
func (t *subtransaction) mustSignalVoters() bool {
	// If somebody else already notified voters, then we mustn't do so
	// again.
	if t.isDone() {
		return false
	}

	// Check if any node has reached the threshold. If it did, then we need
	// to signal voters.
	for _, voteCount := range t.voteCounts {
		if voteCount >= t.threshold {
			return true
		}
	}

	// The threshold wasn't reached by any node yet. If there are missing
	// votes, then we cannot notify yet as any remaining nodes may cause us
	// to reach quorum.
	for _, voter := range t.votersByNode {
		if voter.vote == nil {
			return false
		}
	}

	// Otherwise we know that all votes are in and that no quorum was
	// reached. We thus need to notify callers of the failed vote as the
	// last node which has cast its vote.
	return true
}

// cancelVote cancels a node's vote if the subtransaction is still ongoing. This
// has to be called with the lock acquired as collectVotes does.
func (t *subtransaction) cancelVote(voter *Voter) error {
	if t.isDone() {
		// If the transaction is already done, it's too late to cancel our vote.
		// Other nodes may have committed their changes already.
		return errors.New("subtransaction was already finished")
	}

	// Remove the voter's support for the vote so it's not counted towards the
	// majority. The node is not going to commit the subtransaction anyway.
	t.voteCounts[*voter.vote] -= voter.Votes
	voter.result = VoteCanceled
	return nil
}

func (t *subtransaction) collectVotes(ctx context.Context, node string) error {
	select {
	case <-ctx.Done():
	case <-t.doneCh:
	}

	t.lock.Lock()
	defer t.lock.Unlock()

	voter, ok := t.votersByNode[node]
	if !ok {
		return fmt.Errorf("invalid node for transaction: %q", node)
	}

	// If the waiting stopped due to the context being canceled, we need to cancel
	// this voter's votes.
	if err := ctx.Err(); err != nil {
		if err := t.cancelVote(voter); err != nil {
			return fmt.Errorf("cancel vote: %w", err)
		}

		return ctx.Err()
	}

	switch voter.result {
	case VoteCommitted:
		// Happy case, we are part of the quorum.
		return nil
	case VoteFailed:
		if voter.vote == nil {
			return fmt.Errorf("%w: did not cast a vote", ErrTransactionFailed)
		}
		return fmt.Errorf("%w: got %d/%d votes for %v", ErrTransactionFailed,
			t.voteCounts[*voter.vote], t.threshold, *voter.vote)
	case VoteCanceled:
		// It may happen that the vote was cancelled or stopped just after majority was
		// reached. In that case, the node's state is now VoteCanceled/VoteStopped, so we
		// have to return an error here.
		return ErrTransactionCanceled
	case VoteStopped:
		return ErrTransactionStopped
	case VoteUndecided:
		// We shouldn't ever be undecided if the caller correctly calls
		// `vote()` before calling `collectVotes()` as this node
		// would've cast a vote in that case.
		return fmt.Errorf("voter is in undecided state: %q", node)
	default:
		return fmt.Errorf("voter is in invalid state %d: %q", voter.result, node)
	}
}

func (t *subtransaction) isDone() bool {
	select {
	case <-t.doneCh:
		return true
	default:
		return false
	}
}

func (t *subtransaction) getResult(node string) (VoteResult, error) {
	t.lock.RLock()
	defer t.lock.RUnlock()

	voter, ok := t.votersByNode[node]
	if !ok {
		return VoteCanceled, fmt.Errorf("invalid node for transaction: %q", node)
	}

	return voter.result, nil
}