Welcome to mirror list, hosted at ThFree Co, Russian Federation.

transaction.go « transactions « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 2545ed4f6ec3d4ed2cceab725cfddbe5f710d190 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package transactions

import (
	"context"
	"errors"
	"sync"
)

var (
	// ErrDuplicateNodes indicates a transaction was registered with two
	// voters having the same name.
	ErrDuplicateNodes = errors.New("transactions cannot have duplicate nodes")
	// ErrMissingNodes indicates a transaction was registered with no voters.
	ErrMissingNodes = errors.New("transaction requires at least one node")
	// ErrInvalidThreshold indicates a transaction was registered with an
	// invalid threshold that may either allow for multiple different
	// quorums or none at all.
	ErrInvalidThreshold = errors.New("transaction has invalid threshold")

	// ErrTransactionFailed indicates the transaction didn't reach quorum.
	ErrTransactionFailed = errors.New("transaction did not reach quorum")
	// ErrTransactionCanceled indicates the transaction was canceled before
	// reaching quorum.
	ErrTransactionCanceled = errors.New("transaction has been canceled")
	// ErrTransactionStopped indicates the transaction was gracefully stopped.
	ErrTransactionStopped = errors.New("transaction has been stopped")
)

type transactionState int

const (
	transactionOpen = transactionState(iota)
	transactionCanceled
	transactionStopped
)

// Voter is a participant in a given transaction that may cast a vote.
type Voter struct {
	// Name of the voter, usually Gitaly's storage name.
	Name string
	// Votes is the number of votes available to this voter in the voting
	// process. `0` means the outcome of the vote will not be influenced by
	// this voter.
	Votes uint

	vote   *vote
	result VoteResult
}

// Transaction is an interface for transactions.
type Transaction interface {
	// ID returns the ID of the transaction which uniquely identifies the transaction.
	ID() uint64
	// CountSubtransactions counts the number of subtransactions.
	CountSubtransactions() int
	// State returns the state of each voter part of the transaction.
	State() (map[string]VoteResult, error)
}

// transaction is a session where a set of voters votes on one or more
// subtransactions. Subtransactions are a sequence of sessions, where each node
// needs to go through the same sequence and agree on the same thing in the end
// in order to have the complete transaction succeed.
type transaction struct {
	id        uint64
	threshold uint
	voters    []Voter

	lock            sync.Mutex
	state           transactionState
	subtransactions []*subtransaction
}

func newTransaction(id uint64, voters []Voter, threshold uint) (*transaction, error) {
	if len(voters) == 0 {
		return nil, ErrMissingNodes
	}

	var totalVotes uint
	votersByNode := make(map[string]interface{}, len(voters))
	for _, voter := range voters {
		if _, ok := votersByNode[voter.Name]; ok {
			return nil, ErrDuplicateNodes
		}
		votersByNode[voter.Name] = nil
		totalVotes += voter.Votes
	}

	// If the given threshold is smaller than the total votes, then we
	// cannot ever reach quorum.
	if totalVotes < threshold {
		return nil, ErrInvalidThreshold
	}

	// If the threshold is less or equal than half of all node's votes,
	// it's possible to reach multiple different quorums that settle on
	// different outcomes.
	if threshold*2 <= totalVotes {
		return nil, ErrInvalidThreshold
	}

	return &transaction{
		id:        id,
		threshold: threshold,
		voters:    voters,
		state:     transactionOpen,
	}, nil
}

func (t *transaction) cancel() {
	t.lock.Lock()
	defer t.lock.Unlock()

	for _, subtransaction := range t.subtransactions {
		subtransaction.cancel()
	}

	t.state = transactionCanceled
}

func (t *transaction) stop() error {
	t.lock.Lock()
	defer t.lock.Unlock()

	for _, subtransaction := range t.subtransactions {
		if err := subtransaction.stop(); err != nil {
			return err
		}
	}
	t.state = transactionStopped

	return nil
}

// ID returns the identifier used to uniquely identify a transaction.
func (t *transaction) ID() uint64 {
	return t.id
}

// State returns the voting state mapped by voters. A voting state of `true`
// means all subtransactions were successful, a voting state of `false` means
// either no subtransactions were created or any of the subtransactions failed.
func (t *transaction) State() (map[string]VoteResult, error) {
	t.lock.Lock()
	defer t.lock.Unlock()

	results := make(map[string]VoteResult, len(t.voters))

	if len(t.subtransactions) == 0 {
		for _, voter := range t.voters {
			switch t.state {
			case transactionOpen:
				results[voter.Name] = VoteUndecided
			case transactionCanceled:
				results[voter.Name] = VoteCanceled
			case transactionStopped:
				results[voter.Name] = VoteStopped
			default:
				return nil, errors.New("invalid transaction state")
			}
		}
		return results, nil
	}

	// Collect all subtransactions. As they are ordered by reverse recency, we can simply
	// overwrite our own results.
	for _, subtransaction := range t.subtransactions {
		for voter, result := range subtransaction.state() {
			results[voter] = result
		}
	}

	return results, nil
}

// CountSubtransactions counts the number of subtransactions created as part of
// the transaction.
func (t *transaction) CountSubtransactions() int {
	t.lock.Lock()
	defer t.lock.Unlock()

	return len(t.subtransactions)
}

// getOrCreateSubtransaction gets an ongoing subtransaction on which the given
// node hasn't yet voted on or creates a new one if the node has succeeded on
// all subtransactions. In case the node has failed on any of the
// subtransactions, an error will be returned.
func (t *transaction) getOrCreateSubtransaction(node string) (*subtransaction, error) {
	t.lock.Lock()
	defer t.lock.Unlock()

	switch t.state {
	case transactionOpen:
		// expected state, nothing to do
	case transactionCanceled:
		return nil, ErrTransactionCanceled
	case transactionStopped:
		return nil, ErrTransactionStopped
	default:
		return nil, errors.New("invalid transaction state")
	}

	for _, subtransaction := range t.subtransactions {
		result, err := subtransaction.getResult(node)
		if err != nil {
			return nil, err
		}

		switch result {
		case VoteUndecided:
			// An undecided vote means we should vote on this one.
			return subtransaction, nil
		case VoteCommitted:
			// If we have committed this subtransaction, we're good
			// to go.
			continue
		case VoteFailed:
			// If a vote was cast on a subtransaction which failed
			// to reach majority, then we cannot proceed with any
			// subsequent votes anymore.
			return nil, ErrTransactionFailed
		case VoteCanceled:
			// If the subtransaction was aborted, then we need to
			// fail as we cannot proceed if the path leading to the
			// end result has intermittent failures.
			return nil, ErrTransactionCanceled
		case VoteStopped:
			// If the transaction was stopped, then we need to fail
			// with a graceful error.
			return nil, ErrTransactionStopped
		}
	}

	// If we arrive here, then we know that all the node has voted and
	// reached quorum on all subtransactions. We can thus create a new one.
	subtransaction, err := newSubtransaction(t.voters, t.threshold)
	if err != nil {
		return nil, err
	}

	t.subtransactions = append(t.subtransactions, subtransaction)

	return subtransaction, nil
}

func (t *transaction) vote(ctx context.Context, node string, hash []byte) error {
	subtransaction, err := t.getOrCreateSubtransaction(node)
	if err != nil {
		return err
	}

	if err := subtransaction.vote(node, hash); err != nil {
		return err
	}

	return subtransaction.collectVotes(ctx, node)
}