diff options
author | James Fargher <proglottis@gmail.com> | 2021-05-19 05:39:29 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2021-05-20 02:14:39 +0300 |
commit | 2bc73b4f1668a372fb8cb39e1ca8720ab74e5490 (patch) | |
tree | 0bca9eadbc76f2df4f5315ecb5f456531b4e2a0b | |
parent | ce37ef87e4790c2c1900a54b9401f6a6a587f58a (diff) |
Add option to create backups in parallel
-rw-r--r-- | cmd/gitaly-backup/create.go | 10 | ||||
-rw-r--r-- | internal/backup/pipeline.go | 82 | ||||
-rw-r--r-- | internal/backup/pipeline_test.go | 72 |
3 files changed, 134 insertions, 30 deletions
diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go index 32118fd4b..ea80eb32a 100644 --- a/cmd/gitaly-backup/create.go +++ b/cmd/gitaly-backup/create.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "io" + "runtime" log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/backup" @@ -22,15 +23,22 @@ type serverRepository struct { type createSubcommand struct { backupPath string + parallel int } func (cmd *createSubcommand) Flags(fs *flag.FlagSet) { fs.StringVar(&cmd.backupPath, "path", "", "repository backup path") + fs.IntVar(&cmd.parallel, "parallel", runtime.NumCPU(), "maximum number of parallel backups") } func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error { fsBackup := backup.NewFilesystem(cmd.backupPath) - pipeline := backup.NewPipeline(log.StandardLogger(), fsBackup) + + var pipeline backup.CreatePipeline + pipeline = backup.NewPipeline(log.StandardLogger(), fsBackup) + if cmd.parallel > 0 { + pipeline = backup.NewParallelCreatePipeline(pipeline, cmd.parallel) + } decoder := json.NewDecoder(stdin) for { diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 8f18bf9ba..329faa0b5 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync" "sync/atomic" "github.com/sirupsen/logrus" @@ -16,6 +17,12 @@ type Strategy interface { Restore(context.Context, *RestoreRequest) error } +// CreatePipeline is a pipeline that only handles creating backups +type CreatePipeline interface { + Create(context.Context, *CreateRequest) + Done() error +} + // Pipeline handles a series of requests to create/restore backups. Pipeline // encapsulates error handling for the caller. type Pipeline struct { @@ -83,3 +90,78 @@ func (p *Pipeline) repoLogger(repo *gitalypb.Repository) logrus.FieldLogger { "gl_project_path": repo.GlProjectPath, }) } + +// ParallelCreatePipeline is a pipeline that creates backups in parallel +type ParallelCreatePipeline struct { + next CreatePipeline + n int + + workersOnce sync.Once + wg sync.WaitGroup + done chan struct{} + requests chan *CreateRequest + + mu sync.Mutex + err error +} + +// NewParallelCreatePipeline creates a new ParallelCreatePipeline where `next` +// is the pipeline called to create the backups and `n` is the number of +// parallel backups that will run. +func NewParallelCreatePipeline(next CreatePipeline, n int) *ParallelCreatePipeline { + return &ParallelCreatePipeline{ + next: next, + n: n, + done: make(chan struct{}), + requests: make(chan *CreateRequest), + } +} + +// Create queues a call to `next.Create` which will be run in parallel +func (p *ParallelCreatePipeline) Create(ctx context.Context, req *CreateRequest) { + p.workersOnce.Do(p.startWorkers) + + select { + case <-ctx.Done(): + p.setErr(ctx.Err()) + case p.requests <- req: + } +} + +// Done waits for any in progress calls to `Create` to complete then reports any accumulated errors +func (p *ParallelCreatePipeline) Done() error { + close(p.done) + p.wg.Wait() + if err := p.next.Done(); err != nil { + return err + } + return p.err +} + +func (p *ParallelCreatePipeline) startWorkers() { + for i := 0; i < p.n; i++ { + p.wg.Add(1) + go p.worker() + } +} + +func (p *ParallelCreatePipeline) worker() { + defer p.wg.Done() + for { + select { + case <-p.done: + return + case req := <-p.requests: + p.next.Create(context.TODO(), req) + } + } +} + +func (p *ParallelCreatePipeline) setErr(err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.err != nil { + return + } + p.err = err +} diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 62d02d528..f23a57d53 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -12,35 +12,9 @@ import ( ) func TestPipeline_Create(t *testing.T) { - strategy := MockStrategy{ - CreateFunc: func(_ context.Context, req *CreateRequest) error { - switch req.Repository.StorageName { - case "normal": - return nil - case "skip": - return ErrSkipped - case "error": - return assert.AnError - } - require.Failf(t, "unexpected call to Create", "StorageName = %q", req.Repository.StorageName) - return nil - }, - } - p := NewPipeline(logrus.StandardLogger(), strategy) - - ctx, cancel := testhelper.Context() - defer cancel() - - requests := []CreateRequest{ - {Repository: &gitalypb.Repository{StorageName: "normal"}}, - {Repository: &gitalypb.Repository{StorageName: "skip"}}, - {Repository: &gitalypb.Repository{StorageName: "error"}}, - } - for _, req := range requests { - p.Create(ctx, &req) - } - err := p.Done() - require.EqualError(t, err, "pipeline: 1 failures encountered") + testPipelineCreate(t, func(strategy Strategy) CreatePipeline { + return NewPipeline(logrus.StandardLogger(), strategy) + }) } func TestPipeline_Restore(t *testing.T) { @@ -75,6 +49,12 @@ func TestPipeline_Restore(t *testing.T) { require.EqualError(t, err, "pipeline: 1 failures encountered") } +func TestParallelCreatePipeline(t *testing.T) { + testPipelineCreate(t, func(strategy Strategy) CreatePipeline { + return NewParallelCreatePipeline(NewPipeline(logrus.StandardLogger(), strategy), 2) + }) +} + type MockStrategy struct { CreateFunc func(context.Context, *CreateRequest) error RestoreFunc func(context.Context, *RestoreRequest) error @@ -93,3 +73,37 @@ func (s MockStrategy) Restore(ctx context.Context, req *RestoreRequest) error { } return nil } + +func testPipelineCreate(t *testing.T, init func(Strategy) CreatePipeline) { + t.Run("strategy errors", func(t *testing.T) { + strategy := MockStrategy{ + CreateFunc: func(_ context.Context, req *CreateRequest) error { + switch req.Repository.StorageName { + case "normal": + return nil + case "skip": + return ErrSkipped + case "error": + return assert.AnError + } + require.Failf(t, "unexpected call to Create", "StorageName = %q", req.Repository.StorageName) + return nil + }, + } + p := init(strategy) + + ctx, cancel := testhelper.Context() + defer cancel() + + requests := []CreateRequest{ + {Repository: &gitalypb.Repository{StorageName: "normal"}}, + {Repository: &gitalypb.Repository{StorageName: "skip"}}, + {Repository: &gitalypb.Repository{StorageName: "error"}}, + } + for i := range requests { + p.Create(ctx, &requests[i]) + } + err := p.Done() + require.EqualError(t, err, "pipeline: 1 failures encountered") + }) +} |