Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <proglottis@gmail.com>2021-05-19 05:39:29 +0300
committerJames Fargher <proglottis@gmail.com>2021-05-20 02:14:39 +0300
commit2bc73b4f1668a372fb8cb39e1ca8720ab74e5490 (patch)
tree0bca9eadbc76f2df4f5315ecb5f456531b4e2a0b
parentce37ef87e4790c2c1900a54b9401f6a6a587f58a (diff)
Add option to create backups in parallel
-rw-r--r--cmd/gitaly-backup/create.go10
-rw-r--r--internal/backup/pipeline.go82
-rw-r--r--internal/backup/pipeline_test.go72
3 files changed, 134 insertions, 30 deletions
diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go
index 32118fd4b..ea80eb32a 100644
--- a/cmd/gitaly-backup/create.go
+++ b/cmd/gitaly-backup/create.go
@@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"io"
+ "runtime"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/backup"
@@ -22,15 +23,22 @@ type serverRepository struct {
type createSubcommand struct {
backupPath string
+ parallel int
}
func (cmd *createSubcommand) Flags(fs *flag.FlagSet) {
fs.StringVar(&cmd.backupPath, "path", "", "repository backup path")
+ fs.IntVar(&cmd.parallel, "parallel", runtime.NumCPU(), "maximum number of parallel backups")
}
func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error {
fsBackup := backup.NewFilesystem(cmd.backupPath)
- pipeline := backup.NewPipeline(log.StandardLogger(), fsBackup)
+
+ var pipeline backup.CreatePipeline
+ pipeline = backup.NewPipeline(log.StandardLogger(), fsBackup)
+ if cmd.parallel > 0 {
+ pipeline = backup.NewParallelCreatePipeline(pipeline, cmd.parallel)
+ }
decoder := json.NewDecoder(stdin)
for {
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 8f18bf9ba..329faa0b5 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "sync"
"sync/atomic"
"github.com/sirupsen/logrus"
@@ -16,6 +17,12 @@ type Strategy interface {
Restore(context.Context, *RestoreRequest) error
}
+// CreatePipeline is a pipeline that only handles creating backups
+type CreatePipeline interface {
+ Create(context.Context, *CreateRequest)
+ Done() error
+}
+
// Pipeline handles a series of requests to create/restore backups. Pipeline
// encapsulates error handling for the caller.
type Pipeline struct {
@@ -83,3 +90,78 @@ func (p *Pipeline) repoLogger(repo *gitalypb.Repository) logrus.FieldLogger {
"gl_project_path": repo.GlProjectPath,
})
}
+
+// ParallelCreatePipeline is a pipeline that creates backups in parallel
+type ParallelCreatePipeline struct {
+ next CreatePipeline
+ n int
+
+ workersOnce sync.Once
+ wg sync.WaitGroup
+ done chan struct{}
+ requests chan *CreateRequest
+
+ mu sync.Mutex
+ err error
+}
+
+// NewParallelCreatePipeline creates a new ParallelCreatePipeline where `next`
+// is the pipeline called to create the backups and `n` is the number of
+// parallel backups that will run.
+func NewParallelCreatePipeline(next CreatePipeline, n int) *ParallelCreatePipeline {
+ return &ParallelCreatePipeline{
+ next: next,
+ n: n,
+ done: make(chan struct{}),
+ requests: make(chan *CreateRequest),
+ }
+}
+
+// Create queues a call to `next.Create` which will be run in parallel
+func (p *ParallelCreatePipeline) Create(ctx context.Context, req *CreateRequest) {
+ p.workersOnce.Do(p.startWorkers)
+
+ select {
+ case <-ctx.Done():
+ p.setErr(ctx.Err())
+ case p.requests <- req:
+ }
+}
+
+// Done waits for any in progress calls to `Create` to complete then reports any accumulated errors
+func (p *ParallelCreatePipeline) Done() error {
+ close(p.done)
+ p.wg.Wait()
+ if err := p.next.Done(); err != nil {
+ return err
+ }
+ return p.err
+}
+
+func (p *ParallelCreatePipeline) startWorkers() {
+ for i := 0; i < p.n; i++ {
+ p.wg.Add(1)
+ go p.worker()
+ }
+}
+
+func (p *ParallelCreatePipeline) worker() {
+ defer p.wg.Done()
+ for {
+ select {
+ case <-p.done:
+ return
+ case req := <-p.requests:
+ p.next.Create(context.TODO(), req)
+ }
+ }
+}
+
+func (p *ParallelCreatePipeline) setErr(err error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.err != nil {
+ return
+ }
+ p.err = err
+}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index 62d02d528..f23a57d53 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -12,35 +12,9 @@ import (
)
func TestPipeline_Create(t *testing.T) {
- strategy := MockStrategy{
- CreateFunc: func(_ context.Context, req *CreateRequest) error {
- switch req.Repository.StorageName {
- case "normal":
- return nil
- case "skip":
- return ErrSkipped
- case "error":
- return assert.AnError
- }
- require.Failf(t, "unexpected call to Create", "StorageName = %q", req.Repository.StorageName)
- return nil
- },
- }
- p := NewPipeline(logrus.StandardLogger(), strategy)
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- requests := []CreateRequest{
- {Repository: &gitalypb.Repository{StorageName: "normal"}},
- {Repository: &gitalypb.Repository{StorageName: "skip"}},
- {Repository: &gitalypb.Repository{StorageName: "error"}},
- }
- for _, req := range requests {
- p.Create(ctx, &req)
- }
- err := p.Done()
- require.EqualError(t, err, "pipeline: 1 failures encountered")
+ testPipelineCreate(t, func(strategy Strategy) CreatePipeline {
+ return NewPipeline(logrus.StandardLogger(), strategy)
+ })
}
func TestPipeline_Restore(t *testing.T) {
@@ -75,6 +49,12 @@ func TestPipeline_Restore(t *testing.T) {
require.EqualError(t, err, "pipeline: 1 failures encountered")
}
+func TestParallelCreatePipeline(t *testing.T) {
+ testPipelineCreate(t, func(strategy Strategy) CreatePipeline {
+ return NewParallelCreatePipeline(NewPipeline(logrus.StandardLogger(), strategy), 2)
+ })
+}
+
type MockStrategy struct {
CreateFunc func(context.Context, *CreateRequest) error
RestoreFunc func(context.Context, *RestoreRequest) error
@@ -93,3 +73,37 @@ func (s MockStrategy) Restore(ctx context.Context, req *RestoreRequest) error {
}
return nil
}
+
+func testPipelineCreate(t *testing.T, init func(Strategy) CreatePipeline) {
+ t.Run("strategy errors", func(t *testing.T) {
+ strategy := MockStrategy{
+ CreateFunc: func(_ context.Context, req *CreateRequest) error {
+ switch req.Repository.StorageName {
+ case "normal":
+ return nil
+ case "skip":
+ return ErrSkipped
+ case "error":
+ return assert.AnError
+ }
+ require.Failf(t, "unexpected call to Create", "StorageName = %q", req.Repository.StorageName)
+ return nil
+ },
+ }
+ p := init(strategy)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ requests := []CreateRequest{
+ {Repository: &gitalypb.Repository{StorageName: "normal"}},
+ {Repository: &gitalypb.Repository{StorageName: "skip"}},
+ {Repository: &gitalypb.Repository{StorageName: "error"}},
+ }
+ for i := range requests {
+ p.Create(ctx, &requests[i])
+ }
+ err := p.Done()
+ require.EqualError(t, err, "pipeline: 1 failures encountered")
+ })
+}