diff options
author | James Liu <jliu@gitlab.com> | 2023-12-08 04:16:14 +0300 |
---|---|---|
committer | James Liu <jliu@gitlab.com> | 2023-12-18 03:23:13 +0300 |
commit | 3c1d745a3216187bdf6a30ae52639b4488ae3dbb (patch) | |
tree | 63630602e384f02e76b87288da633d7b857fdc2d | |
parent | fd9753bdaac3f788e0e8f77283bf3c3258b2e10f (diff) |
backup: Rename ParallelPipeline to Pipeline
Now that we have a single Pipeline implementation, we can drop the
Parallel prefix from the Pipeline name. This also removes the need for
the separate Pipeline interface, which can now be deleted too.
-rw-r--r-- | internal/backup/pipeline.go | 65 | ||||
-rw-r--r-- | internal/backup/pipeline_test.go | 16 | ||||
-rw-r--r-- | internal/cli/gitalybackup/create.go | 2 | ||||
-rw-r--r-- | internal/cli/gitalybackup/restore.go | 2 |
4 files changed, 38 insertions, 47 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 470b4a402..a734a1382 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -66,13 +66,6 @@ type Command interface { Execute(context.Context) error } -// Pipeline executes a series of commands and encapsulates error handling for -// the caller. -type Pipeline interface { - Handle(context.Context, Command) - Done() error -} - // CreateCommand creates a backup for a repository type CreateCommand struct { strategy Strategy @@ -153,29 +146,13 @@ func (e PipelineErrors) Error() string { return builder.String() } -func (p *ParallelPipeline) addError(repo *gitalypb.Repository, err error) { - p.errMu.Lock() - defer p.errMu.Unlock() - - p.errs.AddError(repo, err) -} - -func (p *ParallelPipeline) cmdLogger(cmd Command) log.Logger { - return p.log.WithFields(log.Fields{ - "command": cmd.Name(), - "storage_name": cmd.Repository().StorageName, - "relative_path": cmd.Repository().RelativePath, - "gl_project_path": cmd.Repository().GlProjectPath, - }) -} - type contextCommand struct { Command Command Context context.Context } -// ParallelPipeline is a pipeline that executes commands in parallel -type ParallelPipeline struct { +// Pipeline is a pipeline that executes commands in parallel +type Pipeline struct { log log.Logger errs PipelineErrors @@ -192,7 +169,7 @@ type ParallelPipeline struct { err error } -// NewParallelPipeline creates a new ParallelPipeline where all commands are +// NewPipeline creates a new Pipeline where all commands are // passed onto `next` to be processed, `parallel` is the maximum number of // parallel backups that will run and `parallelStorage` is the maximum number // of parallel backups that will run per storage. Since the number of storages @@ -201,7 +178,7 @@ type ParallelPipeline struct { // // Note: When both `parallel` and `parallelStorage` are zero or less no workers // are created and the pipeline will block forever. -func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *ParallelPipeline { +func NewPipeline(log log.Logger, parallel, parallelStorage int) *Pipeline { var workerSlots chan struct{} if parallel > 0 && parallelStorage > 0 { // workerSlots allows the total number of parallel jobs to be @@ -209,7 +186,7 @@ func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *Paralle // each storage, while still limiting the absolute parallelism. workerSlots = make(chan struct{}, parallel) } - return &ParallelPipeline{ + return &Pipeline{ log: log, parallel: parallel, parallelStorage: parallelStorage, @@ -221,7 +198,7 @@ func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *Paralle // Handle queues a request to create a backup. Commands are processed by // n-workers per storage. -func (p *ParallelPipeline) Handle(ctx context.Context, cmd Command) { +func (p *Pipeline) Handle(ctx context.Context, cmd Command) { ch := p.getStorage(cmd.Repository().StorageName) select { @@ -236,7 +213,7 @@ func (p *ParallelPipeline) Handle(ctx context.Context, cmd Command) { // Done waits for any in progress calls to `next` to complete then reports any // accumulated errors -func (p *ParallelPipeline) Done() error { +func (p *Pipeline) Done() error { close(p.done) p.wg.Wait() @@ -253,7 +230,7 @@ func (p *ParallelPipeline) Done() error { // getStorage finds the channel associated with a storage. When no channel is // found, one is created and n-workers are started to process requests. -func (p *ParallelPipeline) getStorage(storage string) chan<- *contextCommand { +func (p *Pipeline) getStorage(storage string) chan<- *contextCommand { p.storageMu.Lock() defer p.storageMu.Unlock() @@ -278,7 +255,7 @@ func (p *ParallelPipeline) getStorage(storage string) chan<- *contextCommand { return ch } -func (p *ParallelPipeline) worker(ch <-chan *contextCommand) { +func (p *Pipeline) worker(ch <-chan *contextCommand) { defer p.wg.Done() for { select { @@ -290,7 +267,7 @@ func (p *ParallelPipeline) worker(ch <-chan *contextCommand) { } } -func (p *ParallelPipeline) processCommand(ctx context.Context, cmd Command) { +func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { p.acquireWorkerSlot() defer p.releaseWorkerSlot() @@ -310,7 +287,7 @@ func (p *ParallelPipeline) processCommand(ctx context.Context, cmd Command) { log.Info(fmt.Sprintf("completed %s", cmd.Name())) } -func (p *ParallelPipeline) setErr(err error) { +func (p *Pipeline) setErr(err error) { p.errMu.Lock() defer p.errMu.Unlock() if p.err != nil { @@ -319,9 +296,25 @@ func (p *ParallelPipeline) setErr(err error) { p.err = err } +func (p *Pipeline) addError(repo *gitalypb.Repository, err error) { + p.errMu.Lock() + defer p.errMu.Unlock() + + p.errs.AddError(repo, err) +} + +func (p *Pipeline) cmdLogger(cmd Command) log.Logger { + return p.log.WithFields(log.Fields{ + "command": cmd.Name(), + "storage_name": cmd.Repository().StorageName, + "relative_path": cmd.Repository().RelativePath, + "gl_project_path": cmd.Repository().GlProjectPath, + }) +} + // acquireWorkerSlot queues the worker until a slot is available. // It never blocks if `parallel` or `parallelStorage` are 0 -func (p *ParallelPipeline) acquireWorkerSlot() { +func (p *Pipeline) acquireWorkerSlot() { if p.workerSlots == nil { return } @@ -329,7 +322,7 @@ func (p *ParallelPipeline) acquireWorkerSlot() { } // releaseWorkerSlot releases the worker slot. -func (p *ParallelPipeline) releaseWorkerSlot() { +func (p *Pipeline) releaseWorkerSlot() { if p.workerSlots == nil { return } diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index e9eda038b..deb31c7d0 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -18,16 +18,16 @@ import ( func TestLoggingPipeline(t *testing.T) { t.Parallel() - testPipeline(t, func() Pipeline { - return NewParallelPipeline(testhelper.SharedLogger(t), 1, 1) + testPipeline(t, func() *Pipeline { + return NewPipeline(testhelper.SharedLogger(t), 1, 1) }) } func TestParallelPipeline(t *testing.T) { t.Parallel() - testPipeline(t, func() Pipeline { - return NewParallelPipeline(testhelper.SharedLogger(t), 2, 0) + testPipeline(t, func() *Pipeline { + return NewPipeline(testhelper.SharedLogger(t), 2, 0) }) t.Run("parallelism", func(t *testing.T) { @@ -65,8 +65,7 @@ func TestParallelPipeline(t *testing.T) { return nil }, } - var p Pipeline - p = NewParallelPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage) + p := NewPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage) ctx := testhelper.Context(t) for i := 0; i < 10; i++ { @@ -80,8 +79,7 @@ func TestParallelPipeline(t *testing.T) { t.Run("context done", func(t *testing.T) { var strategy MockStrategy - var p Pipeline - p = NewParallelPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block + p := NewPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block ctx, cancel := context.WithCancel(testhelper.Context(t)) @@ -122,7 +120,7 @@ func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllR return nil } -func testPipeline(t *testing.T, init func() Pipeline) { +func testPipeline(t *testing.T, init func() *Pipeline) { strategy := MockStrategy{ CreateFunc: func(_ context.Context, req *CreateRequest) error { switch req.Repository.StorageName { diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 34ccec989..64347b0e3 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -151,7 +151,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i if cmd.parallelStorage > 0 { parallelStorage = cmd.parallelStorage } - pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage) + pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index b195c1436..c86f2067d 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -155,7 +155,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin if cmd.parallelStorage > 0 { parallelStorage = cmd.parallelStorage } - pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage) + pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { |