diff options
author | James Liu <jliu@gitlab.com> | 2023-12-08 05:59:11 +0300 |
---|---|---|
committer | James Liu <jliu@gitlab.com> | 2023-12-18 03:44:59 +0300 |
commit | 10912dca956c655cd22b258fa2b8e59113100e15 (patch) | |
tree | 021368bbee0f161ed140bfcf8990cd4da81bc94e | |
parent | cb57a73f569aa3a8b34956f47e32b534d62599cd (diff) |
backup: Add option to set Pipeline concurrency
Uses the option function pattern to set the concurrency of a pipeline
during initialisation. This makes for a more readable flow, and allows
us to set the default state of nil-concurrency internally.
-rw-r--r-- | internal/backup/pipeline.go | 88 | ||||
-rw-r--r-- | internal/backup/pipeline_test.go | 30 | ||||
-rw-r--r-- | internal/cli/gitalybackup/create.go | 14 | ||||
-rw-r--r-- | internal/cli/gitalybackup/restore.go | 14 |
4 files changed, 86 insertions, 60 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 65094788c..c13ce8e99 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -151,7 +151,7 @@ type contextCommand struct { Context context.Context } -// Pipeline is a pipeline that executes commands in parallel +// Pipeline is a pipeline for running backup and restore jobs. type Pipeline struct { log log.Logger errs pipelineErrors @@ -159,7 +159,10 @@ type Pipeline struct { parallel int parallelStorage int - wg sync.WaitGroup + wg sync.WaitGroup + // workerSlots allows the total number of parallel jobs to be + // limited. This allows us to create the required workers for + // each storage, while still limiting the absolute parallelism. workerSlots chan struct{} done chan struct{} @@ -169,35 +172,60 @@ type Pipeline struct { err error } -// NewPipeline creates a new Pipeline where all commands are -// passed onto `next` to be processed, `parallel` is the maximum number of -// parallel backups that will run and `parallelStorage` is the maximum number -// of parallel backups that will run per storage. Since the number of storages -// is unknown at initialisation, workers are created lazily as new storage -// names are encountered. -// -// Note: When both `parallel` and `parallelStorage` are zero or less no workers -// are created and the pipeline will block forever. -func NewPipeline(log log.Logger, parallel, parallelStorage int) *Pipeline { - var workerSlots chan struct{} - if parallel > 0 && parallelStorage > 0 { - // workerSlots allows the total number of parallel jobs to be - // limited. This allows us to create the required workers for - // each storage, while still limiting the absolute parallelism. - workerSlots = make(chan struct{}, parallel) - } - return &Pipeline{ - log: log, - parallel: parallel, - parallelStorage: parallelStorage, - workerSlots: workerSlots, +// NewPipeline creates a pipeline that executes backup and restore jobs. +// The pipeline executes sequentially by default, but can be made concurrent +// by calling WithConcurrency() after initialisation. +func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { + p := &Pipeline{ + log: log, + // Default to no concurrency. + parallel: 1, + parallelStorage: 0, done: make(chan struct{}), requests: make(map[string]chan *contextCommand), } + + for _, opt := range opts { + if err := opt(p); err != nil { + return nil, err + } + } + + return p, nil +} + +// PipelineOption represents an optional configuration parameter for the Pipeline. +type PipelineOption func(*Pipeline) error + +// WithConcurrency configures the pipeline to run backup and restore jobs concurrently. +// total defines the absolute maximum number of jobs that the pipeline should execute +// concurrently. perStorage defines the number of jobs per Gitaly storage that the +// pipeline should attempt to execute concurrently. +// +// For example, in a Gitaly deployment with 2 storages, WithConcurrency(3, 2) means +// that at most 3 jobs will execute concurrently, despite 2 concurrent jobs being allowed +// per storage (2*2=4). +func WithConcurrency(total, perStorage int) PipelineOption { + return func(p *Pipeline) error { + if total == 0 && perStorage == 0 { + return errors.New("total and perStorage cannot both be 0") + } + + p.parallel = total + p.parallelStorage = perStorage + + if total > 0 && perStorage > 0 { + // When both values are provided, we ensure that total limits + // the global concurrency. + p.workerSlots = make(chan struct{}, total) + } + + return nil + } } -// Handle queues a request to create a backup. Commands are processed by -// n-workers per storage. +// Handle queues a request to create a backup. Commands either processed sequentially +// or concurrently, if WithConcurrency() was called. func (p *Pipeline) Handle(ctx context.Context, cmd Command) { ch := p.getStorage(cmd.Repository().StorageName) @@ -211,8 +239,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } } -// Done waits for any in progress calls to `next` to complete then reports any -// accumulated errors +// Done waits for any in progress jobs to complete then reports any accumulated errors func (p *Pipeline) Done() error { close(p.done) p.wg.Wait() @@ -230,13 +257,15 @@ func (p *Pipeline) Done() error { // getStorage finds the channel associated with a storage. When no channel is // found, one is created and n-workers are started to process requests. +// If parallelStorage is 0, a channel is created against a pseudo-storage to +// enforce the number of total concurrent jobs. func (p *Pipeline) getStorage(storage string) chan<- *contextCommand { p.storageMu.Lock() defer p.storageMu.Unlock() workers := p.parallelStorage - if p.parallelStorage < 1 { + if p.parallelStorage == 0 { // if the workers are not limited by storage, then pretend there is a single storage with `parallel` workers storage = "" workers = p.parallel @@ -313,7 +342,6 @@ func (p *Pipeline) cmdLogger(cmd Command) log.Logger { } // acquireWorkerSlot queues the worker until a slot is available. -// It never blocks if `parallel` or `parallelStorage` are 0 func (p *Pipeline) acquireWorkerSlot() { if p.workerSlots == nil { return diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index bb25a4179..a0a4f2205 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -15,21 +15,17 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) -func TestLoggingPipeline(t *testing.T) { +func TestPipeline(t *testing.T) { t.Parallel() + // Sequential testPipeline(t, func() *Pipeline { - return NewPipeline(testhelper.SharedLogger(t), 1, 1) - }) -} - -func TestParallelPipeline(t *testing.T) { - t.Parallel() - - testPipeline(t, func() *Pipeline { - return NewPipeline(testhelper.SharedLogger(t), 2, 0) + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) + return p }) + // Concurrent t.Run("parallelism", func(t *testing.T) { for _, tc := range []struct { parallel int @@ -51,6 +47,11 @@ func TestParallelPipeline(t *testing.T) { parallelStorage: 3, expectedMaxParallel: 6, // 2 storages * 3 workers per storage }, + { + parallel: 3, + parallelStorage: 2, + expectedMaxParallel: 3, // `parallel` takes priority, which is why 2 storages * 2 workers is not the max + }, } { t.Run(fmt.Sprintf("parallel:%d,parallelStorage:%d", tc.parallel, tc.parallelStorage), func(t *testing.T) { var calls int64 @@ -65,7 +66,8 @@ func TestParallelPipeline(t *testing.T) { return nil }, } - p := NewPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage) + p, err := NewPipeline(testhelper.SharedLogger(t), WithConcurrency(tc.parallel, tc.parallelStorage)) + require.NoError(t, err) ctx := testhelper.Context(t) for i := 0; i < 10; i++ { @@ -79,7 +81,8 @@ func TestParallelPipeline(t *testing.T) { t.Run("context done", func(t *testing.T) { var strategy MockStrategy - p := NewPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) ctx, cancel := context.WithCancel(testhelper.Context(t)) @@ -88,8 +91,7 @@ func TestParallelPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}})) - err := p.Done() - require.EqualError(t, err, "pipeline: context canceled") + require.EqualError(t, p.Done(), "pipeline: context canceled") }) } diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 64347b0e3..288e5a08d 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -142,16 +142,14 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i manager = backup.NewManager(sink, locator, pool) } - // Defaults to no concurrency. - parallel := 1 - if cmd.parallel > 0 { - parallel = cmd.parallel + var opts []backup.PipelineOption + if cmd.parallel > 0 || cmd.parallelStorage > 0 { + opts = append(opts, backup.WithConcurrency(cmd.parallel, cmd.parallelStorage)) } - parallelStorage := 1 - if cmd.parallelStorage > 0 { - parallelStorage = cmd.parallelStorage + pipeline, err := backup.NewPipeline(logger, opts...) + if err != nil { + return fmt.Errorf("create pipeline: %w", err) } - pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index c86f2067d..de9e2cd3d 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -146,16 +146,14 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin } } - // Defaults to no concurrency. - parallel := 1 - if cmd.parallel > 0 { - parallel = cmd.parallel + var opts []backup.PipelineOption + if cmd.parallel > 0 || cmd.parallelStorage > 0 { + opts = append(opts, backup.WithConcurrency(cmd.parallel, cmd.parallelStorage)) } - parallelStorage := 1 - if cmd.parallelStorage > 0 { - parallelStorage = cmd.parallelStorage + pipeline, err := backup.NewPipeline(logger, opts...) + if err != nil { + return fmt.Errorf("create pipeline: %w", err) } - pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { |