Welcome to mirror list, hosted at ThFree Co, Russian Federation.

pipeline.go « backup « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 3eb91de9196d8ecc16bc951de925f38b4d8545a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
package backup

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"sync"

	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
	"gitlab.com/gitlab-org/gitaly/v16/internal/log"
	"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)

// Strategy used to create/restore backups
type Strategy interface {
	Create(context.Context, *CreateRequest) error
	Restore(context.Context, *RestoreRequest) error
	ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error)
	RemoveRepository(context.Context, *RemoveRepositoryRequest) error
}

// CreateRequest is the request to create a backup
type CreateRequest struct {
	// Server contains gitaly server connection information required to call
	// RPCs in the non-local backup.Manager configuration.
	Server storage.ServerInfo
	// Repository is the repository to be backed up.
	Repository *gitalypb.Repository
	// VanityRepository is used to determine the backup path.
	VanityRepository *gitalypb.Repository
	// Incremental when true will create an increment on the specified full backup.
	Incremental bool
	// BackupID is used to determine a unique path for the backup when a full
	// backup is created.
	BackupID string
}

// RestoreRequest is the request to restore from a backup
type RestoreRequest struct {
	// Server contains gitaly server connection information required to call
	// RPCs in the non-local backup.Manager configuration.
	Server storage.ServerInfo
	// Repository is the repository to be restored.
	Repository *gitalypb.Repository
	// VanityRepository is used to determine the backup path.
	VanityRepository *gitalypb.Repository
	// AlwaysCreate forces the repository to be created even if no bundle for
	// it exists. See https://gitlab.com/gitlab-org/gitlab/-/issues/357044
	AlwaysCreate bool
	// BackupID is the ID of the full backup to restore. If not specified, the
	// latest backup is restored..
	BackupID string
}

// RemoveRepositoryRequest is a request to remove an individual repository from its storage.
type RemoveRepositoryRequest struct {
	Server storage.ServerInfo
	Repo   *gitalypb.Repository
}

// ListRepositoriesRequest is the request to list repositories in a given storage.
type ListRepositoriesRequest struct {
	Server      storage.ServerInfo
	StorageName string
}

// Command handles a specific backup operation
type Command interface {
	Repository() *gitalypb.Repository
	Name() string
	Execute(context.Context) error
}

// CreateCommand creates a backup for a repository
type CreateCommand struct {
	strategy Strategy
	request  CreateRequest
}

// NewCreateCommand builds a CreateCommand
func NewCreateCommand(strategy Strategy, request CreateRequest) *CreateCommand {
	return &CreateCommand{
		strategy: strategy,
		request:  request,
	}
}

// Repository is the repository that will be acted on
func (cmd CreateCommand) Repository() *gitalypb.Repository {
	return cmd.request.Repository
}

// Name is the name of the command
func (cmd CreateCommand) Name() string {
	return "create"
}

// Execute performs the backup
func (cmd CreateCommand) Execute(ctx context.Context) error {
	return cmd.strategy.Create(ctx, &cmd.request)
}

// RestoreCommand restores a backup for a repository
type RestoreCommand struct {
	strategy Strategy
	request  RestoreRequest
}

// NewRestoreCommand builds a RestoreCommand
func NewRestoreCommand(strategy Strategy, request RestoreRequest) *RestoreCommand {
	return &RestoreCommand{
		strategy: strategy,
		request:  request,
	}
}

// Repository is the repository that will be acted on
func (cmd RestoreCommand) Repository() *gitalypb.Repository {
	return cmd.request.Repository
}

// Name is the name of the command
func (cmd RestoreCommand) Name() string {
	return "restore"
}

// Execute performs the restore
func (cmd RestoreCommand) Execute(ctx context.Context) error {
	return cmd.strategy.Restore(ctx, &cmd.request)
}

// commandErrors represents a summary of errors by repository
//
//nolint:errname
type commandErrors struct {
	errs []error
	mu   sync.Mutex
}

// AddError adds an error associated with a repository to the summary.
func (c *commandErrors) AddError(repo *gitalypb.Repository, err error) {
	c.mu.Lock()
	defer c.mu.Unlock()

	if repo.GetGlProjectPath() != "" {
		err = fmt.Errorf("%s (%s): %w", repo.GetRelativePath(), repo.GetGlProjectPath(), err)
	} else {
		err = fmt.Errorf("%s: %w", repo.GetRelativePath(), err)
	}
	c.errs = append(c.errs, err)
}

func (c *commandErrors) Error() string {
	var builder strings.Builder
	_, _ = fmt.Fprintf(&builder, "%d failures encountered:\n", len(c.errs))
	for _, err := range c.errs {
		_, _ = fmt.Fprintf(&builder, " - %s\n", err.Error())
	}
	return builder.String()
}

type contextCommand struct {
	Command Command
	Context context.Context
}

// Pipeline is a pipeline for running backup and restore jobs.
type Pipeline struct {
	log log.Logger

	parallel        int
	parallelStorage int

	// totalWorkers allows the total number of parallel jobs to be
	// limited. This allows us to create the required workers for
	// each storage, while still limiting the absolute parallelism.
	totalWorkers chan struct{}

	workerWg           sync.WaitGroup
	workersByStorage   map[string]chan *contextCommand
	workersByStorageMu sync.Mutex

	// done signals that no more commands will be provided to the Pipeline via
	// Handle(), and the pipeline should wait for workers to complete and exit.
	done chan struct{}

	pipelineError error
	cmdErrors     *commandErrors

	processedRepos   map[string]map[*gitalypb.Repository]struct{}
	processedReposMu sync.Mutex
}

// NewPipeline creates a pipeline that executes backup and restore jobs.
// The pipeline executes sequentially by default, but can be made concurrent
// by calling WithConcurrency() after initialisation.
func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
	p := &Pipeline{
		log: log,
		// Default to no concurrency.
		parallel:         1,
		parallelStorage:  0,
		done:             make(chan struct{}),
		workersByStorage: make(map[string]chan *contextCommand),
		cmdErrors:        &commandErrors{},
		processedRepos:   make(map[string]map[*gitalypb.Repository]struct{}),
	}

	for _, opt := range opts {
		if err := opt(p); err != nil {
			return nil, err
		}
	}

	return p, nil
}

// PipelineOption represents an optional configuration parameter for the Pipeline.
type PipelineOption func(*Pipeline) error

// WithConcurrency configures the pipeline to run backup and restore jobs concurrently.
// total defines the absolute maximum number of jobs that the pipeline should execute
// concurrently. perStorage defines the number of jobs per Gitaly storage that the
// pipeline should attempt to execute concurrently.
//
// For example, in a Gitaly deployment with 2 storages, WithConcurrency(3, 2) means
// that at most 3 jobs will execute concurrently, despite 2 concurrent jobs being allowed
// per storage (2*2=4).
func WithConcurrency(total, perStorage int) PipelineOption {
	return func(p *Pipeline) error {
		if total == 0 && perStorage == 0 {
			return errors.New("total and perStorage cannot both be 0")
		}

		p.parallel = total
		p.parallelStorage = perStorage

		if total > 0 && perStorage > 0 {
			// When both values are provided, we ensure that total limits
			// the global concurrency.
			p.totalWorkers = make(chan struct{}, total)
		}

		return nil
	}
}

// Handle queues a request to create a backup. Commands either processed sequentially
// or concurrently, if WithConcurrency() was called.
func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
	ch := p.getWorker(cmd.Repository().StorageName)

	select {
	case <-ctx.Done():
		p.setErr(ctx.Err())
	case ch <- &contextCommand{
		Command: cmd,
		Context: ctx,
	}:
	}
}

// Done waits for any in progress jobs to complete then reports any accumulated errors
func (p *Pipeline) Done() (processedRepos map[string]map[*gitalypb.Repository]struct{}, err error) {
	close(p.done)
	p.workerWg.Wait()

	if p.pipelineError != nil {
		return nil, fmt.Errorf("pipeline: %w", p.pipelineError)
	}

	if len(p.cmdErrors.errs) > 0 {
		return nil, fmt.Errorf("pipeline: %w", p.cmdErrors)
	}

	return p.processedRepos, nil
}

// getWorker finds the channel associated with a storage. When no channel is
// found, one is created and n-workers are started to process requests.
// If parallelStorage is 0, a channel is created against a pseudo-storage to
// enforce the number of total concurrent jobs.
func (p *Pipeline) getWorker(storage string) chan<- *contextCommand {
	p.workersByStorageMu.Lock()
	defer p.workersByStorageMu.Unlock()

	workers := p.parallelStorage

	if p.parallelStorage == 0 {
		// if the workers are not limited by storage, then pretend there is a single storage with `parallel` workers
		storage = ""
		workers = p.parallel
	}

	ch, ok := p.workersByStorage[storage]
	if !ok {
		ch = make(chan *contextCommand)
		p.workersByStorage[storage] = ch

		for i := 0; i < workers; i++ {
			p.workerWg.Add(1)
			go p.worker(ch)
		}
	}
	return ch
}

func (p *Pipeline) worker(ch <-chan *contextCommand) {
	defer p.workerWg.Done()
	for {
		select {
		case <-p.done:
			return
		case cmd := <-ch:
			p.processCommand(cmd.Context, cmd.Command)
		}
	}
}

func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
	p.acquireWorkerSlot()
	defer p.releaseWorkerSlot()

	log := p.cmdLogger(cmd)
	log.Info(fmt.Sprintf("started %s", cmd.Name()))

	if err := cmd.Execute(ctx); err != nil {
		if errors.Is(err, ErrSkipped) {
			log.Warn(fmt.Sprintf("skipped %s", cmd.Name()))
		} else {
			log.WithError(err).Error(fmt.Sprintf("%s failed", cmd.Name()))
			p.addError(cmd.Repository(), err)
		}
		return
	}

	storageName := cmd.Repository().StorageName
	p.processedReposMu.Lock()
	if _, ok := p.processedRepos[storageName]; !ok {
		p.processedRepos[storageName] = make(map[*gitalypb.Repository]struct{})
	}
	p.processedRepos[storageName][cmd.Repository()] = struct{}{}
	p.processedReposMu.Unlock()

	log.Info(fmt.Sprintf("completed %s", cmd.Name()))
}

func (p *Pipeline) setErr(err error) {
	if p.pipelineError != nil {
		return
	}
	p.pipelineError = err
}

func (p *Pipeline) addError(repo *gitalypb.Repository, err error) {
	p.cmdErrors.AddError(repo, err)
}

func (p *Pipeline) cmdLogger(cmd Command) log.Logger {
	return p.log.WithFields(log.Fields{
		"command":         cmd.Name(),
		"storage_name":    cmd.Repository().StorageName,
		"relative_path":   cmd.Repository().RelativePath,
		"gl_project_path": cmd.Repository().GlProjectPath,
	})
}

// acquireWorkerSlot queues the worker until a slot is available.
func (p *Pipeline) acquireWorkerSlot() {
	if p.totalWorkers == nil {
		return
	}
	p.totalWorkers <- struct{}{}
}

// releaseWorkerSlot releases the worker slot.
func (p *Pipeline) releaseWorkerSlot() {
	if p.totalWorkers == nil {
		return
	}
	<-p.totalWorkers
}