Welcome to mirror list, hosted at ThFree Co, Russian Federation.

manager.go « transaction « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 12a92d610ecfe2b5f517dab3abd3f2183c89f23a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package transaction

import (
	"context"
	"sync"

	"golang.org/x/sync/errgroup"
)

// Verifier verifies the project repository state by performing a checksum
type Verifier interface {
	// CheckSum will return checksum of all refs for a repo
	CheckSum(context.Context, Repository) ([]byte, error)
}

// Coordinator allows the transaction manager to look up the shard for a repo
// at the beginning of each transaction
type Coordinator interface {
	FetchShard(ctx context.Context, repo Repository) (*Shard, error)
}

// ReplicationManager provides services to handle degraded nodes
type ReplicationManager interface {
	NotifyDegradation(context.Context, Repository) error
}

// Manager tracks the progress of RPCs being applied to multiple
// downstream servers that make up a shard. It prevents conflicts that may arise
// from contention between multiple clients trying to modify the same
// references.
type Manager struct {
	mu     sync.Mutex
	shards map[string]*Shard // shards keyed by project

	verifier    Verifier
	coordinator Coordinator
	replman     ReplicationManager
}

// NewManafer returns a manager for coordinating transaction between shards
// that are fetched from the coordinator. Any inconsistencies found will be
// reported to the provided replication manager.
func NewManager(v Verifier, c Coordinator, r ReplicationManager) *Manager {
	return &Manager{
		shards:      map[string]*Shard{},
		verifier:    v,
		coordinator: c,
		replman:     r,
	}
}

// Mutate accepts a closure whose environment is a snapshot of the repository
// before the transaction begins. It is the responsibility of the closure
// author to mark all relevant assets being modified during a mutating
// transaction to ensure they are locked and protected from other closures
// modifying the same.
func (m *Manager) Mutate(ctx context.Context, repo Repository, fn func(MutateTx) error) error {
	shard, err := m.coordinator.FetchShard(ctx, repo)
	if err != nil {
		return err
	}

	omits := make(map[string]struct{})

	// TODO: some smart caching needs to be done to eliminate this check. We
	// already check the shard consistency at the end of each transaction so
	// we shouldn't need to do it twice except for the first time we fetch a
	// shard.
	good, bad, err := shard.validate(ctx, omits)
	if err != nil {
		return err
	}

	// are a majority of the nodes good (consistent)?
	if len(bad) >= len(good) {
		return ErrShardInconsistent
	}

	omits, err = m.notifyDegradations(ctx, repo, bad)
	if err != nil {
		return err
	}

	tx := newTransaction(shard, good)
	defer tx.unlockAll()

	err = fn(tx)
	if err != nil {
		return err
	}

	// make sure all changes made to replicas are consistent
	good, bad, err = shard.validate(ctx, omits)
	if err != nil {
		return err
	}

	_, err = m.notifyDegradations(ctx, repo, bad)
	if err != nil {
		return err
	}

	return nil
}

func (m *Manager) notifyDegradations(ctx context.Context, repo Repository, degradeds []Node) (map[string]struct{}, error) {
	reported := make(map[string]struct{})

	eg, eCtx := errgroup.WithContext(ctx)
	for _, node := range degradeds {
		node := node // rescope iterator var for goroutine closure
		reported[node.Storage()] = struct{}{}

		eg.Go(func() error {
			return m.replman.NotifyDegradation(eCtx, repo)
		})
	}

	return reported, eg.Wait()
}

func (m *Manager) Access(ctx context.Context, repo Repository, fn func(AccessTx) error) error {
	shard, err := m.coordinator.FetchShard(ctx, repo)
	if err != nil {
		return err
	}

	// TODO: some smart caching needs to be done to eliminate this check. We
	// already check the shard consistency at the end of each transaction so
	// we shouldn't need to do it twice except for the first time we fetch a
	// shard.
	good, bad, err := shard.validate(ctx, map[string]struct{}{})
	if err != nil {
		return err
	}

	_, err = m.notifyDegradations(ctx, repo, bad)
	if err != nil {
		return err
	}

	tx := newTransaction(shard, good)
	defer tx.unlockAll()

	err = fn(tx)
	if err != nil {
		return err
	}

	return nil
}