Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-12-08 05:59:11 +0300
committerJames Liu <jliu@gitlab.com>2023-12-18 03:44:59 +0300
commit10912dca956c655cd22b258fa2b8e59113100e15 (patch)
tree021368bbee0f161ed140bfcf8990cd4da81bc94e
parentcb57a73f569aa3a8b34956f47e32b534d62599cd (diff)
backup: Add option to set Pipeline concurrency
Uses the option function pattern to set the concurrency of a pipeline during initialisation. This makes for a more readable flow, and allows us to set the default state of nil-concurrency internally.
-rw-r--r--internal/backup/pipeline.go88
-rw-r--r--internal/backup/pipeline_test.go30
-rw-r--r--internal/cli/gitalybackup/create.go14
-rw-r--r--internal/cli/gitalybackup/restore.go14
4 files changed, 86 insertions, 60 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 65094788c..c13ce8e99 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -151,7 +151,7 @@ type contextCommand struct {
Context context.Context
}
-// Pipeline is a pipeline that executes commands in parallel
+// Pipeline is a pipeline for running backup and restore jobs.
type Pipeline struct {
log log.Logger
errs pipelineErrors
@@ -159,7 +159,10 @@ type Pipeline struct {
parallel int
parallelStorage int
- wg sync.WaitGroup
+ wg sync.WaitGroup
+ // workerSlots allows the total number of parallel jobs to be
+ // limited. This allows us to create the required workers for
+ // each storage, while still limiting the absolute parallelism.
workerSlots chan struct{}
done chan struct{}
@@ -169,35 +172,60 @@ type Pipeline struct {
err error
}
-// NewPipeline creates a new Pipeline where all commands are
-// passed onto `next` to be processed, `parallel` is the maximum number of
-// parallel backups that will run and `parallelStorage` is the maximum number
-// of parallel backups that will run per storage. Since the number of storages
-// is unknown at initialisation, workers are created lazily as new storage
-// names are encountered.
-//
-// Note: When both `parallel` and `parallelStorage` are zero or less no workers
-// are created and the pipeline will block forever.
-func NewPipeline(log log.Logger, parallel, parallelStorage int) *Pipeline {
- var workerSlots chan struct{}
- if parallel > 0 && parallelStorage > 0 {
- // workerSlots allows the total number of parallel jobs to be
- // limited. This allows us to create the required workers for
- // each storage, while still limiting the absolute parallelism.
- workerSlots = make(chan struct{}, parallel)
- }
- return &Pipeline{
- log: log,
- parallel: parallel,
- parallelStorage: parallelStorage,
- workerSlots: workerSlots,
+// NewPipeline creates a pipeline that executes backup and restore jobs.
+// The pipeline executes sequentially by default, but can be made concurrent
+// by calling WithConcurrency() after initialisation.
+func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
+ p := &Pipeline{
+ log: log,
+ // Default to no concurrency.
+ parallel: 1,
+ parallelStorage: 0,
done: make(chan struct{}),
requests: make(map[string]chan *contextCommand),
}
+
+ for _, opt := range opts {
+ if err := opt(p); err != nil {
+ return nil, err
+ }
+ }
+
+ return p, nil
+}
+
+// PipelineOption represents an optional configuration parameter for the Pipeline.
+type PipelineOption func(*Pipeline) error
+
+// WithConcurrency configures the pipeline to run backup and restore jobs concurrently.
+// total defines the absolute maximum number of jobs that the pipeline should execute
+// concurrently. perStorage defines the number of jobs per Gitaly storage that the
+// pipeline should attempt to execute concurrently.
+//
+// For example, in a Gitaly deployment with 2 storages, WithConcurrency(3, 2) means
+// that at most 3 jobs will execute concurrently, despite 2 concurrent jobs being allowed
+// per storage (2*2=4).
+func WithConcurrency(total, perStorage int) PipelineOption {
+ return func(p *Pipeline) error {
+ if total == 0 && perStorage == 0 {
+ return errors.New("total and perStorage cannot both be 0")
+ }
+
+ p.parallel = total
+ p.parallelStorage = perStorage
+
+ if total > 0 && perStorage > 0 {
+ // When both values are provided, we ensure that total limits
+ // the global concurrency.
+ p.workerSlots = make(chan struct{}, total)
+ }
+
+ return nil
+ }
}
-// Handle queues a request to create a backup. Commands are processed by
-// n-workers per storage.
+// Handle queues a request to create a backup. Commands either processed sequentially
+// or concurrently, if WithConcurrency() was called.
func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
ch := p.getStorage(cmd.Repository().StorageName)
@@ -211,8 +239,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
}
-// Done waits for any in progress calls to `next` to complete then reports any
-// accumulated errors
+// Done waits for any in progress jobs to complete then reports any accumulated errors
func (p *Pipeline) Done() error {
close(p.done)
p.wg.Wait()
@@ -230,13 +257,15 @@ func (p *Pipeline) Done() error {
// getStorage finds the channel associated with a storage. When no channel is
// found, one is created and n-workers are started to process requests.
+// If parallelStorage is 0, a channel is created against a pseudo-storage to
+// enforce the number of total concurrent jobs.
func (p *Pipeline) getStorage(storage string) chan<- *contextCommand {
p.storageMu.Lock()
defer p.storageMu.Unlock()
workers := p.parallelStorage
- if p.parallelStorage < 1 {
+ if p.parallelStorage == 0 {
// if the workers are not limited by storage, then pretend there is a single storage with `parallel` workers
storage = ""
workers = p.parallel
@@ -313,7 +342,6 @@ func (p *Pipeline) cmdLogger(cmd Command) log.Logger {
}
// acquireWorkerSlot queues the worker until a slot is available.
-// It never blocks if `parallel` or `parallelStorage` are 0
func (p *Pipeline) acquireWorkerSlot() {
if p.workerSlots == nil {
return
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index bb25a4179..a0a4f2205 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -15,21 +15,17 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
-func TestLoggingPipeline(t *testing.T) {
+func TestPipeline(t *testing.T) {
t.Parallel()
+ // Sequential
testPipeline(t, func() *Pipeline {
- return NewPipeline(testhelper.SharedLogger(t), 1, 1)
- })
-}
-
-func TestParallelPipeline(t *testing.T) {
- t.Parallel()
-
- testPipeline(t, func() *Pipeline {
- return NewPipeline(testhelper.SharedLogger(t), 2, 0)
+ p, err := NewPipeline(testhelper.SharedLogger(t))
+ require.NoError(t, err)
+ return p
})
+ // Concurrent
t.Run("parallelism", func(t *testing.T) {
for _, tc := range []struct {
parallel int
@@ -51,6 +47,11 @@ func TestParallelPipeline(t *testing.T) {
parallelStorage: 3,
expectedMaxParallel: 6, // 2 storages * 3 workers per storage
},
+ {
+ parallel: 3,
+ parallelStorage: 2,
+ expectedMaxParallel: 3, // `parallel` takes priority, which is why 2 storages * 2 workers is not the max
+ },
} {
t.Run(fmt.Sprintf("parallel:%d,parallelStorage:%d", tc.parallel, tc.parallelStorage), func(t *testing.T) {
var calls int64
@@ -65,7 +66,8 @@ func TestParallelPipeline(t *testing.T) {
return nil
},
}
- p := NewPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage)
+ p, err := NewPipeline(testhelper.SharedLogger(t), WithConcurrency(tc.parallel, tc.parallelStorage))
+ require.NoError(t, err)
ctx := testhelper.Context(t)
for i := 0; i < 10; i++ {
@@ -79,7 +81,8 @@ func TestParallelPipeline(t *testing.T) {
t.Run("context done", func(t *testing.T) {
var strategy MockStrategy
- p := NewPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block
+ p, err := NewPipeline(testhelper.SharedLogger(t))
+ require.NoError(t, err)
ctx, cancel := context.WithCancel(testhelper.Context(t))
@@ -88,8 +91,7 @@ func TestParallelPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}}))
- err := p.Done()
- require.EqualError(t, err, "pipeline: context canceled")
+ require.EqualError(t, p.Done(), "pipeline: context canceled")
})
}
diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go
index 64347b0e3..288e5a08d 100644
--- a/internal/cli/gitalybackup/create.go
+++ b/internal/cli/gitalybackup/create.go
@@ -142,16 +142,14 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i
manager = backup.NewManager(sink, locator, pool)
}
- // Defaults to no concurrency.
- parallel := 1
- if cmd.parallel > 0 {
- parallel = cmd.parallel
+ var opts []backup.PipelineOption
+ if cmd.parallel > 0 || cmd.parallelStorage > 0 {
+ opts = append(opts, backup.WithConcurrency(cmd.parallel, cmd.parallelStorage))
}
- parallelStorage := 1
- if cmd.parallelStorage > 0 {
- parallelStorage = cmd.parallelStorage
+ pipeline, err := backup.NewPipeline(logger, opts...)
+ if err != nil {
+ return fmt.Errorf("create pipeline: %w", err)
}
- pipeline := backup.NewPipeline(logger, parallel, parallelStorage)
decoder := json.NewDecoder(stdin)
for {
diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go
index c86f2067d..de9e2cd3d 100644
--- a/internal/cli/gitalybackup/restore.go
+++ b/internal/cli/gitalybackup/restore.go
@@ -146,16 +146,14 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
}
}
- // Defaults to no concurrency.
- parallel := 1
- if cmd.parallel > 0 {
- parallel = cmd.parallel
+ var opts []backup.PipelineOption
+ if cmd.parallel > 0 || cmd.parallelStorage > 0 {
+ opts = append(opts, backup.WithConcurrency(cmd.parallel, cmd.parallelStorage))
}
- parallelStorage := 1
- if cmd.parallelStorage > 0 {
- parallelStorage = cmd.parallelStorage
+ pipeline, err := backup.NewPipeline(logger, opts...)
+ if err != nil {
+ return fmt.Errorf("create pipeline: %w", err)
}
- pipeline := backup.NewPipeline(logger, parallel, parallelStorage)
decoder := json.NewDecoder(stdin)
for {