Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2021-05-20 18:40:16 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2021-05-20 18:40:16 +0300
commitcb17899da94f9a53676d71190c42cc05a368734e (patch)
tree86796cda8c1980a2f59c5f7f8eb6cafdf6233a71
parent65badb06b1364494cfdb226cab1bba77a9785ecb (diff)
parent2bc73b4f1668a372fb8cb39e1ca8720ab74e5490 (diff)
Merge branch 'parallel_backup' into 'master'
Add option to run backups in parallel See merge request gitlab-org/gitaly!3509
-rw-r--r--cmd/gitaly-backup/create.go36
-rw-r--r--cmd/gitaly-backup/create_test.go2
-rw-r--r--cmd/gitaly-backup/restore.go28
-rw-r--r--cmd/gitaly-backup/restore_test.go2
-rw-r--r--internal/backup/backup.go57
-rw-r--r--internal/backup/backup_test.go15
-rw-r--r--internal/backup/pipeline.go167
-rw-r--r--internal/backup/pipeline_test.go109
8 files changed, 347 insertions, 69 deletions
diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go
index 49aac2be8..ea80eb32a 100644
--- a/cmd/gitaly-backup/create.go
+++ b/cmd/gitaly-backup/create.go
@@ -3,10 +3,10 @@ package main
import (
"context"
"encoding/json"
- "errors"
"flag"
"fmt"
"io"
+ "runtime"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/backup"
@@ -23,16 +23,23 @@ type serverRepository struct {
type createSubcommand struct {
backupPath string
+ parallel int
}
func (cmd *createSubcommand) Flags(fs *flag.FlagSet) {
fs.StringVar(&cmd.backupPath, "path", "", "repository backup path")
+ fs.IntVar(&cmd.parallel, "parallel", runtime.NumCPU(), "maximum number of parallel backups")
}
func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error {
fsBackup := backup.NewFilesystem(cmd.backupPath)
- var failed int
+ var pipeline backup.CreatePipeline
+ pipeline = backup.NewPipeline(log.StandardLogger(), fsBackup)
+ if cmd.parallel > 0 {
+ pipeline = backup.NewParallelCreatePipeline(pipeline, cmd.parallel)
+ }
+
decoder := json.NewDecoder(stdin)
for {
var sr serverRepository
@@ -41,32 +48,19 @@ func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io
} else if err != nil {
return fmt.Errorf("create: %w", err)
}
- repoLog := log.WithFields(log.Fields{
- "storage_name": sr.StorageName,
- "relative_path": sr.RelativePath,
- "gl_project_path": sr.GlProjectPath,
- })
repo := gitalypb.Repository{
StorageName: sr.StorageName,
RelativePath: sr.RelativePath,
GlProjectPath: sr.GlProjectPath,
}
- repoLog.Info("started backup")
- if err := fsBackup.BackupRepository(ctx, sr.ServerInfo, &repo); err != nil {
- if errors.Is(err, backup.ErrSkipped) {
- repoLog.Warn("skipped backup")
- } else {
- repoLog.WithError(err).Error("backup failed")
- failed++
- }
- continue
- }
-
- repoLog.Info("completed backup")
+ pipeline.Create(ctx, &backup.CreateRequest{
+ Server: sr.ServerInfo,
+ Repository: &repo,
+ })
}
- if failed > 0 {
- return fmt.Errorf("create: %d failures encountered", failed)
+ if err := pipeline.Done(); err != nil {
+ return fmt.Errorf("create: %w", err)
}
return nil
}
diff --git a/cmd/gitaly-backup/create_test.go b/cmd/gitaly-backup/create_test.go
index ba0a537c7..d26d37b11 100644
--- a/cmd/gitaly-backup/create_test.go
+++ b/cmd/gitaly-backup/create_test.go
@@ -58,7 +58,7 @@ func TestCreateSubcommand(t *testing.T) {
require.NoError(t, fs.Parse([]string{"-path", path}))
require.EqualError(t,
cmd.Run(context.Background(), &stdin, ioutil.Discard),
- "create: 1 failures encountered")
+ "create: pipeline: 1 failures encountered")
for _, repo := range repos {
bundlePath := filepath.Join(path, repo.RelativePath+".bundle")
diff --git a/cmd/gitaly-backup/restore.go b/cmd/gitaly-backup/restore.go
index 619192a8a..a7b8a3e10 100644
--- a/cmd/gitaly-backup/restore.go
+++ b/cmd/gitaly-backup/restore.go
@@ -32,8 +32,8 @@ func (cmd *restoreSubcommand) Flags(fs *flag.FlagSet) {
func (cmd *restoreSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error {
fsBackup := backup.NewFilesystem(cmd.backupPath)
+ pipeline := backup.NewPipeline(log.StandardLogger(), fsBackup)
- var failed int
decoder := json.NewDecoder(stdin)
for {
var req restoreRequest
@@ -43,32 +43,20 @@ func (cmd *restoreSubcommand) Run(ctx context.Context, stdin io.Reader, stdout i
return fmt.Errorf("restore: %w", err)
}
- repoLog := log.WithFields(log.Fields{
- "storage_name": req.StorageName,
- "relative_path": req.RelativePath,
- "gl_project_path": req.GlProjectPath,
- })
repo := gitalypb.Repository{
StorageName: req.StorageName,
RelativePath: req.RelativePath,
GlProjectPath: req.GlProjectPath,
}
- repoLog.Info("started restore")
- if err := fsBackup.RestoreRepository(ctx, req.ServerInfo, &repo, req.AlwaysCreate); err != nil {
- if errors.Is(err, backup.ErrSkipped) {
- repoLog.WithError(err).Warn("skipped restore")
- } else {
- repoLog.WithError(err).Error("restore failed")
- failed++
- }
- continue
- }
-
- repoLog.Info("completed restore")
+ pipeline.Restore(ctx, &backup.RestoreRequest{
+ Server: req.ServerInfo,
+ Repository: &repo,
+ AlwaysCreate: req.AlwaysCreate,
+ })
}
- if failed > 0 {
- return fmt.Errorf("restore: %d failures encountered", failed)
+ if err := pipeline.Done(); err != nil {
+ return fmt.Errorf("restore: %w", err)
}
return nil
}
diff --git a/cmd/gitaly-backup/restore_test.go b/cmd/gitaly-backup/restore_test.go
index 583e15c7c..015cce792 100644
--- a/cmd/gitaly-backup/restore_test.go
+++ b/cmd/gitaly-backup/restore_test.go
@@ -65,7 +65,7 @@ func TestRestoreSubcommand(t *testing.T) {
require.NoError(t, fs.Parse([]string{"-path", path}))
require.EqualError(t,
cmd.Run(context.Background(), &stdin, ioutil.Discard),
- "restore: 1 failures encountered")
+ "restore: pipeline: 1 failures encountered")
for _, repo := range repos {
repoPath := filepath.Join(cfg.Storages[0].Path, repo.RelativePath)
diff --git a/internal/backup/backup.go b/internal/backup/backup.go
index 46445f9a2..f08166fe8 100644
--- a/internal/backup/backup.go
+++ b/internal/backup/backup.go
@@ -34,56 +34,69 @@ func NewFilesystem(path string) *Filesystem {
}
}
-// BackupRepository creates a repository backup on a local filesystem
-func (fs *Filesystem) BackupRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository) error {
- if isEmpty, err := fs.isEmpty(ctx, server, repo); err != nil {
- return fmt.Errorf("backup: %w", err)
+// CreateRequest is the request to create a backup
+type CreateRequest struct {
+ Server storage.ServerInfo
+ Repository *gitalypb.Repository
+}
+
+// Create creates a repository backup on a local filesystem
+func (fs *Filesystem) Create(ctx context.Context, req *CreateRequest) error {
+ if isEmpty, err := fs.isEmpty(ctx, req.Server, req.Repository); err != nil {
+ return fmt.Errorf("filesystem: %w", err)
} else if isEmpty {
return ErrSkipped
}
- backupPath := strings.TrimSuffix(filepath.Join(fs.path, repo.RelativePath), ".git")
+ backupPath := strings.TrimSuffix(filepath.Join(fs.path, req.Repository.RelativePath), ".git")
bundlePath := backupPath + ".bundle"
customHooksPath := filepath.Join(backupPath, "custom_hooks.tar")
if err := os.MkdirAll(backupPath, 0700); err != nil {
- return fmt.Errorf("backup: %w", err)
+ return fmt.Errorf("filesystem: %w", err)
}
- if err := fs.writeBundle(ctx, bundlePath, server, repo); err != nil {
- return fmt.Errorf("backup: write bundle: %w", err)
+ if err := fs.writeBundle(ctx, bundlePath, req.Server, req.Repository); err != nil {
+ return fmt.Errorf("filesystem: write bundle: %w", err)
}
- if err := fs.writeCustomHooks(ctx, customHooksPath, server, repo); err != nil {
- return fmt.Errorf("backup: write custom hooks: %w", err)
+ if err := fs.writeCustomHooks(ctx, customHooksPath, req.Server, req.Repository); err != nil {
+ return fmt.Errorf("filesystem: write custom hooks: %w", err)
}
return nil
}
-// RestoreRepository restores a repository from a backup on a local filesystem
-func (fs *Filesystem) RestoreRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository, alwaysCreate bool) error {
- backupPath := strings.TrimSuffix(filepath.Join(fs.path, repo.RelativePath), ".git")
+// RestoreRequest is the request to restore from a backup
+type RestoreRequest struct {
+ Server storage.ServerInfo
+ Repository *gitalypb.Repository
+ AlwaysCreate bool
+}
+
+// Restore restores a repository from a backup on a local filesystem
+func (fs *Filesystem) Restore(ctx context.Context, req *RestoreRequest) error {
+ backupPath := strings.TrimSuffix(filepath.Join(fs.path, req.Repository.RelativePath), ".git")
bundlePath := backupPath + ".bundle"
customHooksPath := filepath.Join(backupPath, "custom_hooks.tar")
- if err := fs.removeRepository(ctx, server, repo); err != nil {
- return fmt.Errorf("restore: %w", err)
+ if err := fs.removeRepository(ctx, req.Server, req.Repository); err != nil {
+ return fmt.Errorf("filesystem: %w", err)
}
- if err := fs.restoreBundle(ctx, bundlePath, server, repo); err != nil {
+ if err := fs.restoreBundle(ctx, bundlePath, req.Server, req.Repository); err != nil {
// For compatibility with existing backups we need to always create the
// repository even if there's no bundle for project repositories
// (not wiki or snippet repositories). Gitaly does not know which
// repository is which type so here we accept a parameter to tell us
// to employ this behaviour.
- if alwaysCreate && errors.Is(err, ErrSkipped) {
- if err := fs.createRepository(ctx, server, repo); err != nil {
- return fmt.Errorf("restore: %w", err)
+ if req.AlwaysCreate && errors.Is(err, ErrSkipped) {
+ if err := fs.createRepository(ctx, req.Server, req.Repository); err != nil {
+ return fmt.Errorf("filesystem: %w", err)
}
} else {
- return fmt.Errorf("restore: %w", err)
+ return fmt.Errorf("filesystem: %w", err)
}
}
- if err := fs.restoreCustomHooks(ctx, customHooksPath, server, repo); err != nil {
- return fmt.Errorf("restore: %w", err)
+ if err := fs.restoreCustomHooks(ctx, customHooksPath, req.Server, req.Repository); err != nil {
+ return fmt.Errorf("filesystem: %w", err)
}
return nil
}
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index adeae46bb..26d1a1034 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -17,7 +17,7 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-func TestFilesystem_BackupRepository(t *testing.T) {
+func TestFilesystem_Create(t *testing.T) {
cfg := testcfg.Build(t)
gitalyAddr := testserver.RunGitalyServer(t, cfg, nil, setup.RegisterAll)
@@ -76,7 +76,10 @@ func TestFilesystem_BackupRepository(t *testing.T) {
defer cancel()
fsBackup := NewFilesystem(path)
- err := fsBackup.BackupRepository(ctx, storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token}, tc.repo)
+ err := fsBackup.Create(ctx, &CreateRequest{
+ Server: storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token},
+ Repository: tc.repo,
+ })
if tc.err == nil {
require.NoError(t, err)
} else {
@@ -109,7 +112,7 @@ func TestFilesystem_BackupRepository(t *testing.T) {
}
}
-func TestFilesystem_RestoreRepository(t *testing.T) {
+func TestFilesystem_Restore(t *testing.T) {
cfg := testcfg.Build(t)
testhelper.ConfigureGitalyHooksBin(t, cfg)
@@ -175,7 +178,11 @@ func TestFilesystem_RestoreRepository(t *testing.T) {
defer cancel()
fsBackup := NewFilesystem(path)
- err := fsBackup.RestoreRepository(ctx, storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token}, tc.repo, tc.alwaysCreate)
+ err := fsBackup.Restore(ctx, &RestoreRequest{
+ Server: storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token},
+ Repository: tc.repo,
+ AlwaysCreate: tc.alwaysCreate,
+ })
if tc.expectedErrAs != nil {
require.True(t, errors.Is(err, tc.expectedErrAs), err.Error())
} else {
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
new file mode 100644
index 000000000..329faa0b5
--- /dev/null
+++ b/internal/backup/pipeline.go
@@ -0,0 +1,167 @@
+package backup
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+// Strategy used to create/restore backups
+type Strategy interface {
+ Create(context.Context, *CreateRequest) error
+ Restore(context.Context, *RestoreRequest) error
+}
+
+// CreatePipeline is a pipeline that only handles creating backups
+type CreatePipeline interface {
+ Create(context.Context, *CreateRequest)
+ Done() error
+}
+
+// Pipeline handles a series of requests to create/restore backups. Pipeline
+// encapsulates error handling for the caller.
+type Pipeline struct {
+ log logrus.FieldLogger
+ strategy Strategy
+ failed int64
+}
+
+// NewPipeline creates a new pipeline
+func NewPipeline(log logrus.FieldLogger, strategy Strategy) *Pipeline {
+ return &Pipeline{
+ log: log,
+ strategy: strategy,
+ }
+}
+
+// Create requests that a repository backup be created
+func (p *Pipeline) Create(ctx context.Context, req *CreateRequest) {
+ repoLog := p.repoLogger(req.Repository)
+ repoLog.Info("started backup")
+
+ if err := p.strategy.Create(ctx, req); err != nil {
+ if errors.Is(err, ErrSkipped) {
+ repoLog.WithError(err).Warn("skipped backup")
+ } else {
+ repoLog.WithError(err).Error("backup failed")
+ atomic.AddInt64(&p.failed, 1)
+ }
+ return
+ }
+
+ repoLog.Info("completed backup")
+}
+
+// Restore requests that a repository be restored from backup
+func (p *Pipeline) Restore(ctx context.Context, req *RestoreRequest) {
+ repoLog := p.repoLogger(req.Repository)
+ repoLog.Info("started restore")
+
+ if err := p.strategy.Restore(ctx, req); err != nil {
+ if errors.Is(err, ErrSkipped) {
+ repoLog.WithError(err).Warn("skipped restore")
+ } else {
+ repoLog.WithError(err).Error("restore failed")
+ atomic.AddInt64(&p.failed, 1)
+ }
+ return
+ }
+
+ repoLog.Info("completed restore")
+}
+
+// Done indicates that the pipeline is complete and returns any accumulated errors
+func (p *Pipeline) Done() error {
+ if p.failed > 0 {
+ return fmt.Errorf("pipeline: %d failures encountered", p.failed)
+ }
+ return nil
+}
+
+func (p *Pipeline) repoLogger(repo *gitalypb.Repository) logrus.FieldLogger {
+ return p.log.WithFields(logrus.Fields{
+ "storage_name": repo.StorageName,
+ "relative_path": repo.RelativePath,
+ "gl_project_path": repo.GlProjectPath,
+ })
+}
+
+// ParallelCreatePipeline is a pipeline that creates backups in parallel
+type ParallelCreatePipeline struct {
+ next CreatePipeline
+ n int
+
+ workersOnce sync.Once
+ wg sync.WaitGroup
+ done chan struct{}
+ requests chan *CreateRequest
+
+ mu sync.Mutex
+ err error
+}
+
+// NewParallelCreatePipeline creates a new ParallelCreatePipeline where `next`
+// is the pipeline called to create the backups and `n` is the number of
+// parallel backups that will run.
+func NewParallelCreatePipeline(next CreatePipeline, n int) *ParallelCreatePipeline {
+ return &ParallelCreatePipeline{
+ next: next,
+ n: n,
+ done: make(chan struct{}),
+ requests: make(chan *CreateRequest),
+ }
+}
+
+// Create queues a call to `next.Create` which will be run in parallel
+func (p *ParallelCreatePipeline) Create(ctx context.Context, req *CreateRequest) {
+ p.workersOnce.Do(p.startWorkers)
+
+ select {
+ case <-ctx.Done():
+ p.setErr(ctx.Err())
+ case p.requests <- req:
+ }
+}
+
+// Done waits for any in progress calls to `Create` to complete then reports any accumulated errors
+func (p *ParallelCreatePipeline) Done() error {
+ close(p.done)
+ p.wg.Wait()
+ if err := p.next.Done(); err != nil {
+ return err
+ }
+ return p.err
+}
+
+func (p *ParallelCreatePipeline) startWorkers() {
+ for i := 0; i < p.n; i++ {
+ p.wg.Add(1)
+ go p.worker()
+ }
+}
+
+func (p *ParallelCreatePipeline) worker() {
+ defer p.wg.Done()
+ for {
+ select {
+ case <-p.done:
+ return
+ case req := <-p.requests:
+ p.next.Create(context.TODO(), req)
+ }
+ }
+}
+
+func (p *ParallelCreatePipeline) setErr(err error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.err != nil {
+ return
+ }
+ p.err = err
+}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
new file mode 100644
index 000000000..f23a57d53
--- /dev/null
+++ b/internal/backup/pipeline_test.go
@@ -0,0 +1,109 @@
+package backup
+
+import (
+ "context"
+ "testing"
+
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+func TestPipeline_Create(t *testing.T) {
+ testPipelineCreate(t, func(strategy Strategy) CreatePipeline {
+ return NewPipeline(logrus.StandardLogger(), strategy)
+ })
+}
+
+func TestPipeline_Restore(t *testing.T) {
+ strategy := MockStrategy{
+ RestoreFunc: func(_ context.Context, req *RestoreRequest) error {
+ switch req.Repository.StorageName {
+ case "normal":
+ return nil
+ case "skip":
+ return ErrSkipped
+ case "error":
+ return assert.AnError
+ }
+ require.Failf(t, "unexpected call to Restore", "StorageName = %q", req.Repository.StorageName)
+ return nil
+ },
+ }
+ p := NewPipeline(logrus.StandardLogger(), strategy)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ requests := []RestoreRequest{
+ {Repository: &gitalypb.Repository{StorageName: "normal"}},
+ {Repository: &gitalypb.Repository{StorageName: "skip"}},
+ {Repository: &gitalypb.Repository{StorageName: "error"}},
+ }
+ for _, req := range requests {
+ p.Restore(ctx, &req)
+ }
+ err := p.Done()
+ require.EqualError(t, err, "pipeline: 1 failures encountered")
+}
+
+func TestParallelCreatePipeline(t *testing.T) {
+ testPipelineCreate(t, func(strategy Strategy) CreatePipeline {
+ return NewParallelCreatePipeline(NewPipeline(logrus.StandardLogger(), strategy), 2)
+ })
+}
+
+type MockStrategy struct {
+ CreateFunc func(context.Context, *CreateRequest) error
+ RestoreFunc func(context.Context, *RestoreRequest) error
+}
+
+func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error {
+ if s.CreateFunc != nil {
+ return s.CreateFunc(ctx, req)
+ }
+ return nil
+}
+
+func (s MockStrategy) Restore(ctx context.Context, req *RestoreRequest) error {
+ if s.RestoreFunc != nil {
+ return s.RestoreFunc(ctx, req)
+ }
+ return nil
+}
+
+func testPipelineCreate(t *testing.T, init func(Strategy) CreatePipeline) {
+ t.Run("strategy errors", func(t *testing.T) {
+ strategy := MockStrategy{
+ CreateFunc: func(_ context.Context, req *CreateRequest) error {
+ switch req.Repository.StorageName {
+ case "normal":
+ return nil
+ case "skip":
+ return ErrSkipped
+ case "error":
+ return assert.AnError
+ }
+ require.Failf(t, "unexpected call to Create", "StorageName = %q", req.Repository.StorageName)
+ return nil
+ },
+ }
+ p := init(strategy)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ requests := []CreateRequest{
+ {Repository: &gitalypb.Repository{StorageName: "normal"}},
+ {Repository: &gitalypb.Repository{StorageName: "skip"}},
+ {Repository: &gitalypb.Repository{StorageName: "error"}},
+ }
+ for i := range requests {
+ p.Create(ctx, &requests[i])
+ }
+ err := p.Done()
+ require.EqualError(t, err, "pipeline: 1 failures encountered")
+ })
+}