diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-20 18:40:16 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2021-05-20 18:40:16 +0300 |
commit | cb17899da94f9a53676d71190c42cc05a368734e (patch) | |
tree | 86796cda8c1980a2f59c5f7f8eb6cafdf6233a71 | |
parent | 65badb06b1364494cfdb226cab1bba77a9785ecb (diff) | |
parent | 2bc73b4f1668a372fb8cb39e1ca8720ab74e5490 (diff) |
Merge branch 'parallel_backup' into 'master'
Add option to run backups in parallel
See merge request gitlab-org/gitaly!3509
-rw-r--r-- | cmd/gitaly-backup/create.go | 36 | ||||
-rw-r--r-- | cmd/gitaly-backup/create_test.go | 2 | ||||
-rw-r--r-- | cmd/gitaly-backup/restore.go | 28 | ||||
-rw-r--r-- | cmd/gitaly-backup/restore_test.go | 2 | ||||
-rw-r--r-- | internal/backup/backup.go | 57 | ||||
-rw-r--r-- | internal/backup/backup_test.go | 15 | ||||
-rw-r--r-- | internal/backup/pipeline.go | 167 | ||||
-rw-r--r-- | internal/backup/pipeline_test.go | 109 |
8 files changed, 347 insertions, 69 deletions
diff --git a/cmd/gitaly-backup/create.go b/cmd/gitaly-backup/create.go index 49aac2be8..ea80eb32a 100644 --- a/cmd/gitaly-backup/create.go +++ b/cmd/gitaly-backup/create.go @@ -3,10 +3,10 @@ package main import ( "context" "encoding/json" - "errors" "flag" "fmt" "io" + "runtime" log "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/backup" @@ -23,16 +23,23 @@ type serverRepository struct { type createSubcommand struct { backupPath string + parallel int } func (cmd *createSubcommand) Flags(fs *flag.FlagSet) { fs.StringVar(&cmd.backupPath, "path", "", "repository backup path") + fs.IntVar(&cmd.parallel, "parallel", runtime.NumCPU(), "maximum number of parallel backups") } func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error { fsBackup := backup.NewFilesystem(cmd.backupPath) - var failed int + var pipeline backup.CreatePipeline + pipeline = backup.NewPipeline(log.StandardLogger(), fsBackup) + if cmd.parallel > 0 { + pipeline = backup.NewParallelCreatePipeline(pipeline, cmd.parallel) + } + decoder := json.NewDecoder(stdin) for { var sr serverRepository @@ -41,32 +48,19 @@ func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io } else if err != nil { return fmt.Errorf("create: %w", err) } - repoLog := log.WithFields(log.Fields{ - "storage_name": sr.StorageName, - "relative_path": sr.RelativePath, - "gl_project_path": sr.GlProjectPath, - }) repo := gitalypb.Repository{ StorageName: sr.StorageName, RelativePath: sr.RelativePath, GlProjectPath: sr.GlProjectPath, } - repoLog.Info("started backup") - if err := fsBackup.BackupRepository(ctx, sr.ServerInfo, &repo); err != nil { - if errors.Is(err, backup.ErrSkipped) { - repoLog.Warn("skipped backup") - } else { - repoLog.WithError(err).Error("backup failed") - failed++ - } - continue - } - - repoLog.Info("completed backup") + pipeline.Create(ctx, &backup.CreateRequest{ + Server: sr.ServerInfo, + Repository: &repo, + }) } - if failed > 0 { - return fmt.Errorf("create: %d failures encountered", failed) + if err := pipeline.Done(); err != nil { + return fmt.Errorf("create: %w", err) } return nil } diff --git a/cmd/gitaly-backup/create_test.go b/cmd/gitaly-backup/create_test.go index ba0a537c7..d26d37b11 100644 --- a/cmd/gitaly-backup/create_test.go +++ b/cmd/gitaly-backup/create_test.go @@ -58,7 +58,7 @@ func TestCreateSubcommand(t *testing.T) { require.NoError(t, fs.Parse([]string{"-path", path})) require.EqualError(t, cmd.Run(context.Background(), &stdin, ioutil.Discard), - "create: 1 failures encountered") + "create: pipeline: 1 failures encountered") for _, repo := range repos { bundlePath := filepath.Join(path, repo.RelativePath+".bundle") diff --git a/cmd/gitaly-backup/restore.go b/cmd/gitaly-backup/restore.go index 619192a8a..a7b8a3e10 100644 --- a/cmd/gitaly-backup/restore.go +++ b/cmd/gitaly-backup/restore.go @@ -32,8 +32,8 @@ func (cmd *restoreSubcommand) Flags(fs *flag.FlagSet) { func (cmd *restoreSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error { fsBackup := backup.NewFilesystem(cmd.backupPath) + pipeline := backup.NewPipeline(log.StandardLogger(), fsBackup) - var failed int decoder := json.NewDecoder(stdin) for { var req restoreRequest @@ -43,32 +43,20 @@ func (cmd *restoreSubcommand) Run(ctx context.Context, stdin io.Reader, stdout i return fmt.Errorf("restore: %w", err) } - repoLog := log.WithFields(log.Fields{ - "storage_name": req.StorageName, - "relative_path": req.RelativePath, - "gl_project_path": req.GlProjectPath, - }) repo := gitalypb.Repository{ StorageName: req.StorageName, RelativePath: req.RelativePath, GlProjectPath: req.GlProjectPath, } - repoLog.Info("started restore") - if err := fsBackup.RestoreRepository(ctx, req.ServerInfo, &repo, req.AlwaysCreate); err != nil { - if errors.Is(err, backup.ErrSkipped) { - repoLog.WithError(err).Warn("skipped restore") - } else { - repoLog.WithError(err).Error("restore failed") - failed++ - } - continue - } - - repoLog.Info("completed restore") + pipeline.Restore(ctx, &backup.RestoreRequest{ + Server: req.ServerInfo, + Repository: &repo, + AlwaysCreate: req.AlwaysCreate, + }) } - if failed > 0 { - return fmt.Errorf("restore: %d failures encountered", failed) + if err := pipeline.Done(); err != nil { + return fmt.Errorf("restore: %w", err) } return nil } diff --git a/cmd/gitaly-backup/restore_test.go b/cmd/gitaly-backup/restore_test.go index 583e15c7c..015cce792 100644 --- a/cmd/gitaly-backup/restore_test.go +++ b/cmd/gitaly-backup/restore_test.go @@ -65,7 +65,7 @@ func TestRestoreSubcommand(t *testing.T) { require.NoError(t, fs.Parse([]string{"-path", path})) require.EqualError(t, cmd.Run(context.Background(), &stdin, ioutil.Discard), - "restore: 1 failures encountered") + "restore: pipeline: 1 failures encountered") for _, repo := range repos { repoPath := filepath.Join(cfg.Storages[0].Path, repo.RelativePath) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index 46445f9a2..f08166fe8 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -34,56 +34,69 @@ func NewFilesystem(path string) *Filesystem { } } -// BackupRepository creates a repository backup on a local filesystem -func (fs *Filesystem) BackupRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository) error { - if isEmpty, err := fs.isEmpty(ctx, server, repo); err != nil { - return fmt.Errorf("backup: %w", err) +// CreateRequest is the request to create a backup +type CreateRequest struct { + Server storage.ServerInfo + Repository *gitalypb.Repository +} + +// Create creates a repository backup on a local filesystem +func (fs *Filesystem) Create(ctx context.Context, req *CreateRequest) error { + if isEmpty, err := fs.isEmpty(ctx, req.Server, req.Repository); err != nil { + return fmt.Errorf("filesystem: %w", err) } else if isEmpty { return ErrSkipped } - backupPath := strings.TrimSuffix(filepath.Join(fs.path, repo.RelativePath), ".git") + backupPath := strings.TrimSuffix(filepath.Join(fs.path, req.Repository.RelativePath), ".git") bundlePath := backupPath + ".bundle" customHooksPath := filepath.Join(backupPath, "custom_hooks.tar") if err := os.MkdirAll(backupPath, 0700); err != nil { - return fmt.Errorf("backup: %w", err) + return fmt.Errorf("filesystem: %w", err) } - if err := fs.writeBundle(ctx, bundlePath, server, repo); err != nil { - return fmt.Errorf("backup: write bundle: %w", err) + if err := fs.writeBundle(ctx, bundlePath, req.Server, req.Repository); err != nil { + return fmt.Errorf("filesystem: write bundle: %w", err) } - if err := fs.writeCustomHooks(ctx, customHooksPath, server, repo); err != nil { - return fmt.Errorf("backup: write custom hooks: %w", err) + if err := fs.writeCustomHooks(ctx, customHooksPath, req.Server, req.Repository); err != nil { + return fmt.Errorf("filesystem: write custom hooks: %w", err) } return nil } -// RestoreRepository restores a repository from a backup on a local filesystem -func (fs *Filesystem) RestoreRepository(ctx context.Context, server storage.ServerInfo, repo *gitalypb.Repository, alwaysCreate bool) error { - backupPath := strings.TrimSuffix(filepath.Join(fs.path, repo.RelativePath), ".git") +// RestoreRequest is the request to restore from a backup +type RestoreRequest struct { + Server storage.ServerInfo + Repository *gitalypb.Repository + AlwaysCreate bool +} + +// Restore restores a repository from a backup on a local filesystem +func (fs *Filesystem) Restore(ctx context.Context, req *RestoreRequest) error { + backupPath := strings.TrimSuffix(filepath.Join(fs.path, req.Repository.RelativePath), ".git") bundlePath := backupPath + ".bundle" customHooksPath := filepath.Join(backupPath, "custom_hooks.tar") - if err := fs.removeRepository(ctx, server, repo); err != nil { - return fmt.Errorf("restore: %w", err) + if err := fs.removeRepository(ctx, req.Server, req.Repository); err != nil { + return fmt.Errorf("filesystem: %w", err) } - if err := fs.restoreBundle(ctx, bundlePath, server, repo); err != nil { + if err := fs.restoreBundle(ctx, bundlePath, req.Server, req.Repository); err != nil { // For compatibility with existing backups we need to always create the // repository even if there's no bundle for project repositories // (not wiki or snippet repositories). Gitaly does not know which // repository is which type so here we accept a parameter to tell us // to employ this behaviour. - if alwaysCreate && errors.Is(err, ErrSkipped) { - if err := fs.createRepository(ctx, server, repo); err != nil { - return fmt.Errorf("restore: %w", err) + if req.AlwaysCreate && errors.Is(err, ErrSkipped) { + if err := fs.createRepository(ctx, req.Server, req.Repository); err != nil { + return fmt.Errorf("filesystem: %w", err) } } else { - return fmt.Errorf("restore: %w", err) + return fmt.Errorf("filesystem: %w", err) } } - if err := fs.restoreCustomHooks(ctx, customHooksPath, server, repo); err != nil { - return fmt.Errorf("restore: %w", err) + if err := fs.restoreCustomHooks(ctx, customHooksPath, req.Server, req.Repository); err != nil { + return fmt.Errorf("filesystem: %w", err) } return nil } diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index adeae46bb..26d1a1034 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -17,7 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -func TestFilesystem_BackupRepository(t *testing.T) { +func TestFilesystem_Create(t *testing.T) { cfg := testcfg.Build(t) gitalyAddr := testserver.RunGitalyServer(t, cfg, nil, setup.RegisterAll) @@ -76,7 +76,10 @@ func TestFilesystem_BackupRepository(t *testing.T) { defer cancel() fsBackup := NewFilesystem(path) - err := fsBackup.BackupRepository(ctx, storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token}, tc.repo) + err := fsBackup.Create(ctx, &CreateRequest{ + Server: storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token}, + Repository: tc.repo, + }) if tc.err == nil { require.NoError(t, err) } else { @@ -109,7 +112,7 @@ func TestFilesystem_BackupRepository(t *testing.T) { } } -func TestFilesystem_RestoreRepository(t *testing.T) { +func TestFilesystem_Restore(t *testing.T) { cfg := testcfg.Build(t) testhelper.ConfigureGitalyHooksBin(t, cfg) @@ -175,7 +178,11 @@ func TestFilesystem_RestoreRepository(t *testing.T) { defer cancel() fsBackup := NewFilesystem(path) - err := fsBackup.RestoreRepository(ctx, storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token}, tc.repo, tc.alwaysCreate) + err := fsBackup.Restore(ctx, &RestoreRequest{ + Server: storage.ServerInfo{Address: gitalyAddr, Token: cfg.Auth.Token}, + Repository: tc.repo, + AlwaysCreate: tc.alwaysCreate, + }) if tc.expectedErrAs != nil { require.True(t, errors.Is(err, tc.expectedErrAs), err.Error()) } else { diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go new file mode 100644 index 000000000..329faa0b5 --- /dev/null +++ b/internal/backup/pipeline.go @@ -0,0 +1,167 @@ +package backup + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +// Strategy used to create/restore backups +type Strategy interface { + Create(context.Context, *CreateRequest) error + Restore(context.Context, *RestoreRequest) error +} + +// CreatePipeline is a pipeline that only handles creating backups +type CreatePipeline interface { + Create(context.Context, *CreateRequest) + Done() error +} + +// Pipeline handles a series of requests to create/restore backups. Pipeline +// encapsulates error handling for the caller. +type Pipeline struct { + log logrus.FieldLogger + strategy Strategy + failed int64 +} + +// NewPipeline creates a new pipeline +func NewPipeline(log logrus.FieldLogger, strategy Strategy) *Pipeline { + return &Pipeline{ + log: log, + strategy: strategy, + } +} + +// Create requests that a repository backup be created +func (p *Pipeline) Create(ctx context.Context, req *CreateRequest) { + repoLog := p.repoLogger(req.Repository) + repoLog.Info("started backup") + + if err := p.strategy.Create(ctx, req); err != nil { + if errors.Is(err, ErrSkipped) { + repoLog.WithError(err).Warn("skipped backup") + } else { + repoLog.WithError(err).Error("backup failed") + atomic.AddInt64(&p.failed, 1) + } + return + } + + repoLog.Info("completed backup") +} + +// Restore requests that a repository be restored from backup +func (p *Pipeline) Restore(ctx context.Context, req *RestoreRequest) { + repoLog := p.repoLogger(req.Repository) + repoLog.Info("started restore") + + if err := p.strategy.Restore(ctx, req); err != nil { + if errors.Is(err, ErrSkipped) { + repoLog.WithError(err).Warn("skipped restore") + } else { + repoLog.WithError(err).Error("restore failed") + atomic.AddInt64(&p.failed, 1) + } + return + } + + repoLog.Info("completed restore") +} + +// Done indicates that the pipeline is complete and returns any accumulated errors +func (p *Pipeline) Done() error { + if p.failed > 0 { + return fmt.Errorf("pipeline: %d failures encountered", p.failed) + } + return nil +} + +func (p *Pipeline) repoLogger(repo *gitalypb.Repository) logrus.FieldLogger { + return p.log.WithFields(logrus.Fields{ + "storage_name": repo.StorageName, + "relative_path": repo.RelativePath, + "gl_project_path": repo.GlProjectPath, + }) +} + +// ParallelCreatePipeline is a pipeline that creates backups in parallel +type ParallelCreatePipeline struct { + next CreatePipeline + n int + + workersOnce sync.Once + wg sync.WaitGroup + done chan struct{} + requests chan *CreateRequest + + mu sync.Mutex + err error +} + +// NewParallelCreatePipeline creates a new ParallelCreatePipeline where `next` +// is the pipeline called to create the backups and `n` is the number of +// parallel backups that will run. +func NewParallelCreatePipeline(next CreatePipeline, n int) *ParallelCreatePipeline { + return &ParallelCreatePipeline{ + next: next, + n: n, + done: make(chan struct{}), + requests: make(chan *CreateRequest), + } +} + +// Create queues a call to `next.Create` which will be run in parallel +func (p *ParallelCreatePipeline) Create(ctx context.Context, req *CreateRequest) { + p.workersOnce.Do(p.startWorkers) + + select { + case <-ctx.Done(): + p.setErr(ctx.Err()) + case p.requests <- req: + } +} + +// Done waits for any in progress calls to `Create` to complete then reports any accumulated errors +func (p *ParallelCreatePipeline) Done() error { + close(p.done) + p.wg.Wait() + if err := p.next.Done(); err != nil { + return err + } + return p.err +} + +func (p *ParallelCreatePipeline) startWorkers() { + for i := 0; i < p.n; i++ { + p.wg.Add(1) + go p.worker() + } +} + +func (p *ParallelCreatePipeline) worker() { + defer p.wg.Done() + for { + select { + case <-p.done: + return + case req := <-p.requests: + p.next.Create(context.TODO(), req) + } + } +} + +func (p *ParallelCreatePipeline) setErr(err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.err != nil { + return + } + p.err = err +} diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go new file mode 100644 index 000000000..f23a57d53 --- /dev/null +++ b/internal/backup/pipeline_test.go @@ -0,0 +1,109 @@ +package backup + +import ( + "context" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +func TestPipeline_Create(t *testing.T) { + testPipelineCreate(t, func(strategy Strategy) CreatePipeline { + return NewPipeline(logrus.StandardLogger(), strategy) + }) +} + +func TestPipeline_Restore(t *testing.T) { + strategy := MockStrategy{ + RestoreFunc: func(_ context.Context, req *RestoreRequest) error { + switch req.Repository.StorageName { + case "normal": + return nil + case "skip": + return ErrSkipped + case "error": + return assert.AnError + } + require.Failf(t, "unexpected call to Restore", "StorageName = %q", req.Repository.StorageName) + return nil + }, + } + p := NewPipeline(logrus.StandardLogger(), strategy) + + ctx, cancel := testhelper.Context() + defer cancel() + + requests := []RestoreRequest{ + {Repository: &gitalypb.Repository{StorageName: "normal"}}, + {Repository: &gitalypb.Repository{StorageName: "skip"}}, + {Repository: &gitalypb.Repository{StorageName: "error"}}, + } + for _, req := range requests { + p.Restore(ctx, &req) + } + err := p.Done() + require.EqualError(t, err, "pipeline: 1 failures encountered") +} + +func TestParallelCreatePipeline(t *testing.T) { + testPipelineCreate(t, func(strategy Strategy) CreatePipeline { + return NewParallelCreatePipeline(NewPipeline(logrus.StandardLogger(), strategy), 2) + }) +} + +type MockStrategy struct { + CreateFunc func(context.Context, *CreateRequest) error + RestoreFunc func(context.Context, *RestoreRequest) error +} + +func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { + if s.CreateFunc != nil { + return s.CreateFunc(ctx, req) + } + return nil +} + +func (s MockStrategy) Restore(ctx context.Context, req *RestoreRequest) error { + if s.RestoreFunc != nil { + return s.RestoreFunc(ctx, req) + } + return nil +} + +func testPipelineCreate(t *testing.T, init func(Strategy) CreatePipeline) { + t.Run("strategy errors", func(t *testing.T) { + strategy := MockStrategy{ + CreateFunc: func(_ context.Context, req *CreateRequest) error { + switch req.Repository.StorageName { + case "normal": + return nil + case "skip": + return ErrSkipped + case "error": + return assert.AnError + } + require.Failf(t, "unexpected call to Create", "StorageName = %q", req.Repository.StorageName) + return nil + }, + } + p := init(strategy) + + ctx, cancel := testhelper.Context() + defer cancel() + + requests := []CreateRequest{ + {Repository: &gitalypb.Repository{StorageName: "normal"}}, + {Repository: &gitalypb.Repository{StorageName: "skip"}}, + {Repository: &gitalypb.Repository{StorageName: "error"}}, + } + for i := range requests { + p.Create(ctx, &requests[i]) + } + err := p.Done() + require.EqualError(t, err, "pipeline: 1 failures encountered") + }) +} |