Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-12-08 04:16:14 +0300
committerJames Liu <jliu@gitlab.com>2023-12-18 03:23:13 +0300
commit3c1d745a3216187bdf6a30ae52639b4488ae3dbb (patch)
tree63630602e384f02e76b87288da633d7b857fdc2d
parentfd9753bdaac3f788e0e8f77283bf3c3258b2e10f (diff)
backup: Rename ParallelPipeline to Pipeline
Now that we have a single Pipeline implementation, we can drop the Parallel prefix from the Pipeline name. This also removes the need for the separate Pipeline interface, which can now be deleted too.
-rw-r--r--internal/backup/pipeline.go65
-rw-r--r--internal/backup/pipeline_test.go16
-rw-r--r--internal/cli/gitalybackup/create.go2
-rw-r--r--internal/cli/gitalybackup/restore.go2
4 files changed, 38 insertions, 47 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 470b4a402..a734a1382 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -66,13 +66,6 @@ type Command interface {
Execute(context.Context) error
}
-// Pipeline executes a series of commands and encapsulates error handling for
-// the caller.
-type Pipeline interface {
- Handle(context.Context, Command)
- Done() error
-}
-
// CreateCommand creates a backup for a repository
type CreateCommand struct {
strategy Strategy
@@ -153,29 +146,13 @@ func (e PipelineErrors) Error() string {
return builder.String()
}
-func (p *ParallelPipeline) addError(repo *gitalypb.Repository, err error) {
- p.errMu.Lock()
- defer p.errMu.Unlock()
-
- p.errs.AddError(repo, err)
-}
-
-func (p *ParallelPipeline) cmdLogger(cmd Command) log.Logger {
- return p.log.WithFields(log.Fields{
- "command": cmd.Name(),
- "storage_name": cmd.Repository().StorageName,
- "relative_path": cmd.Repository().RelativePath,
- "gl_project_path": cmd.Repository().GlProjectPath,
- })
-}
-
type contextCommand struct {
Command Command
Context context.Context
}
-// ParallelPipeline is a pipeline that executes commands in parallel
-type ParallelPipeline struct {
+// Pipeline is a pipeline that executes commands in parallel
+type Pipeline struct {
log log.Logger
errs PipelineErrors
@@ -192,7 +169,7 @@ type ParallelPipeline struct {
err error
}
-// NewParallelPipeline creates a new ParallelPipeline where all commands are
+// NewPipeline creates a new Pipeline where all commands are
// passed onto `next` to be processed, `parallel` is the maximum number of
// parallel backups that will run and `parallelStorage` is the maximum number
// of parallel backups that will run per storage. Since the number of storages
@@ -201,7 +178,7 @@ type ParallelPipeline struct {
//
// Note: When both `parallel` and `parallelStorage` are zero or less no workers
// are created and the pipeline will block forever.
-func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *ParallelPipeline {
+func NewPipeline(log log.Logger, parallel, parallelStorage int) *Pipeline {
var workerSlots chan struct{}
if parallel > 0 && parallelStorage > 0 {
// workerSlots allows the total number of parallel jobs to be
@@ -209,7 +186,7 @@ func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *Paralle
// each storage, while still limiting the absolute parallelism.
workerSlots = make(chan struct{}, parallel)
}
- return &ParallelPipeline{
+ return &Pipeline{
log: log,
parallel: parallel,
parallelStorage: parallelStorage,
@@ -221,7 +198,7 @@ func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *Paralle
// Handle queues a request to create a backup. Commands are processed by
// n-workers per storage.
-func (p *ParallelPipeline) Handle(ctx context.Context, cmd Command) {
+func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
ch := p.getStorage(cmd.Repository().StorageName)
select {
@@ -236,7 +213,7 @@ func (p *ParallelPipeline) Handle(ctx context.Context, cmd Command) {
// Done waits for any in progress calls to `next` to complete then reports any
// accumulated errors
-func (p *ParallelPipeline) Done() error {
+func (p *Pipeline) Done() error {
close(p.done)
p.wg.Wait()
@@ -253,7 +230,7 @@ func (p *ParallelPipeline) Done() error {
// getStorage finds the channel associated with a storage. When no channel is
// found, one is created and n-workers are started to process requests.
-func (p *ParallelPipeline) getStorage(storage string) chan<- *contextCommand {
+func (p *Pipeline) getStorage(storage string) chan<- *contextCommand {
p.storageMu.Lock()
defer p.storageMu.Unlock()
@@ -278,7 +255,7 @@ func (p *ParallelPipeline) getStorage(storage string) chan<- *contextCommand {
return ch
}
-func (p *ParallelPipeline) worker(ch <-chan *contextCommand) {
+func (p *Pipeline) worker(ch <-chan *contextCommand) {
defer p.wg.Done()
for {
select {
@@ -290,7 +267,7 @@ func (p *ParallelPipeline) worker(ch <-chan *contextCommand) {
}
}
-func (p *ParallelPipeline) processCommand(ctx context.Context, cmd Command) {
+func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
p.acquireWorkerSlot()
defer p.releaseWorkerSlot()
@@ -310,7 +287,7 @@ func (p *ParallelPipeline) processCommand(ctx context.Context, cmd Command) {
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
}
-func (p *ParallelPipeline) setErr(err error) {
+func (p *Pipeline) setErr(err error) {
p.errMu.Lock()
defer p.errMu.Unlock()
if p.err != nil {
@@ -319,9 +296,25 @@ func (p *ParallelPipeline) setErr(err error) {
p.err = err
}
+func (p *Pipeline) addError(repo *gitalypb.Repository, err error) {
+ p.errMu.Lock()
+ defer p.errMu.Unlock()
+
+ p.errs.AddError(repo, err)
+}
+
+func (p *Pipeline) cmdLogger(cmd Command) log.Logger {
+ return p.log.WithFields(log.Fields{
+ "command": cmd.Name(),
+ "storage_name": cmd.Repository().StorageName,
+ "relative_path": cmd.Repository().RelativePath,
+ "gl_project_path": cmd.Repository().GlProjectPath,
+ })
+}
+
// acquireWorkerSlot queues the worker until a slot is available.
// It never blocks if `parallel` or `parallelStorage` are 0
-func (p *ParallelPipeline) acquireWorkerSlot() {
+func (p *Pipeline) acquireWorkerSlot() {
if p.workerSlots == nil {
return
}
@@ -329,7 +322,7 @@ func (p *ParallelPipeline) acquireWorkerSlot() {
}
// releaseWorkerSlot releases the worker slot.
-func (p *ParallelPipeline) releaseWorkerSlot() {
+func (p *Pipeline) releaseWorkerSlot() {
if p.workerSlots == nil {
return
}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index e9eda038b..deb31c7d0 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -18,16 +18,16 @@ import (
func TestLoggingPipeline(t *testing.T) {
t.Parallel()
- testPipeline(t, func() Pipeline {
- return NewParallelPipeline(testhelper.SharedLogger(t), 1, 1)
+ testPipeline(t, func() *Pipeline {
+ return NewPipeline(testhelper.SharedLogger(t), 1, 1)
})
}
func TestParallelPipeline(t *testing.T) {
t.Parallel()
- testPipeline(t, func() Pipeline {
- return NewParallelPipeline(testhelper.SharedLogger(t), 2, 0)
+ testPipeline(t, func() *Pipeline {
+ return NewPipeline(testhelper.SharedLogger(t), 2, 0)
})
t.Run("parallelism", func(t *testing.T) {
@@ -65,8 +65,7 @@ func TestParallelPipeline(t *testing.T) {
return nil
},
}
- var p Pipeline
- p = NewParallelPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage)
+ p := NewPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage)
ctx := testhelper.Context(t)
for i := 0; i < 10; i++ {
@@ -80,8 +79,7 @@ func TestParallelPipeline(t *testing.T) {
t.Run("context done", func(t *testing.T) {
var strategy MockStrategy
- var p Pipeline
- p = NewParallelPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block
+ p := NewPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block
ctx, cancel := context.WithCancel(testhelper.Context(t))
@@ -122,7 +120,7 @@ func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllR
return nil
}
-func testPipeline(t *testing.T, init func() Pipeline) {
+func testPipeline(t *testing.T, init func() *Pipeline) {
strategy := MockStrategy{
CreateFunc: func(_ context.Context, req *CreateRequest) error {
switch req.Repository.StorageName {
diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go
index 34ccec989..64347b0e3 100644
--- a/internal/cli/gitalybackup/create.go
+++ b/internal/cli/gitalybackup/create.go
@@ -151,7 +151,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i
if cmd.parallelStorage > 0 {
parallelStorage = cmd.parallelStorage
}
- pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage)
+ pipeline := backup.NewPipeline(logger, parallel, parallelStorage)
decoder := json.NewDecoder(stdin)
for {
diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go
index b195c1436..c86f2067d 100644
--- a/internal/cli/gitalybackup/restore.go
+++ b/internal/cli/gitalybackup/restore.go
@@ -155,7 +155,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
if cmd.parallelStorage > 0 {
parallelStorage = cmd.parallelStorage
}
- pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage)
+ pipeline := backup.NewPipeline(logger, parallel, parallelStorage)
decoder := json.NewDecoder(stdin)
for {