Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Liu <jliu@gitlab.com>2023-12-12 06:12:04 +0300
committerJames Liu <jliu@gitlab.com>2023-12-20 01:27:07 +0300
commitf33c47e7bf554ecc9cc08e0a0a9397ea88dc5aaa (patch)
tree0c6793749156a6cde6af771ac3f42260f69a0fa3
parent73920c7baa9ace55ba51d555e309897fd331c1b4 (diff)
backup: Track repos that have been processed
Adds a map to the Pipeline to track repos that have been restored or backed up. A mutex is used to synchronise access to the map, as entries are appended by goroutines operating in the workers. The signature of Done() is modified to return the map, and is intentionally ignored in the actual backup and restore logic for now. A subsequent commit will utilise the map for restore operations.
-rw-r--r--internal/backup/pipeline.go17
-rw-r--r--internal/backup/pipeline_test.go37
-rw-r--r--internal/cli/gitalybackup/create.go2
-rw-r--r--internal/cli/gitalybackup/restore.go3
4 files changed, 49 insertions, 10 deletions
diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go
index 5672493ac..8000f61af 100644
--- a/internal/backup/pipeline.go
+++ b/internal/backup/pipeline.go
@@ -181,6 +181,9 @@ type Pipeline struct {
pipelineError error
cmdErrors *commandErrors
+
+ processedRepos map[string][]*gitalypb.Repository
+ processedReposMu sync.Mutex
}
// NewPipeline creates a pipeline that executes backup and restore jobs.
@@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) {
done: make(chan struct{}),
workersByStorage: make(map[string]chan *contextCommand),
cmdErrors: &commandErrors{},
+ processedRepos: make(map[string][]*gitalypb.Repository),
}
for _, opt := range opts {
@@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) {
}
// Done waits for any in progress jobs to complete then reports any accumulated errors
-func (p *Pipeline) Done() error {
+func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) {
close(p.done)
p.workerWg.Wait()
if p.pipelineError != nil {
- return fmt.Errorf("pipeline: %w", p.pipelineError)
+ return nil, fmt.Errorf("pipeline: %w", p.pipelineError)
}
if len(p.cmdErrors.errs) > 0 {
- return fmt.Errorf("pipeline: %w", p.cmdErrors)
+ return nil, fmt.Errorf("pipeline: %w", p.cmdErrors)
}
- return nil
+ return p.processedRepos, nil
}
// getWorker finds the channel associated with a storage. When no channel is
@@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) {
return
}
+ storageName := cmd.Repository().StorageName
+ p.processedReposMu.Lock()
+ p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository())
+ p.processedReposMu.Unlock()
+
log.Info(fmt.Sprintf("completed %s", cmd.Name()))
}
diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go
index 04c539a5f..8545c905e 100644
--- a/internal/backup/pipeline_test.go
+++ b/internal/backup/pipeline_test.go
@@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}}))
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}}))
}
- require.NoError(t, p.Done())
+ _, err = p.Done()
+ require.NoError(t, err)
})
}
})
@@ -115,7 +116,8 @@ func TestPipeline(t *testing.T) {
p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}}))
- require.EqualError(t, p.Done(), "pipeline: context canceled")
+ _, err = p.Done()
+ require.EqualError(t, err, "pipeline: context canceled")
})
}
@@ -222,7 +224,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
require.Equal(t, tc.level, logEntry.Level)
}
- err := p.Done()
+ _, err := p.Done()
if tc.level == logrus.ErrorLevel {
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
@@ -258,7 +260,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) {
for _, cmd := range commands {
p.Handle(ctx, cmd)
}
- err := p.Done()
+ _, err := p.Done()
require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n")
})
}
@@ -309,3 +311,30 @@ func TestPipelineError(t *testing.T) {
})
}
}
+
+func TestPipelineProcessedRepos(t *testing.T) {
+ strategy := MockStrategy{}
+
+ repos := map[string][]*gitalypb.Repository{
+ "storage1": {
+ {RelativePath: "a.git", StorageName: "storage1"},
+ {RelativePath: "b.git", StorageName: "storage1"},
+ },
+ "storage2": {{RelativePath: "c.git", StorageName: "storage2"}},
+ "storage3": {{RelativePath: "d.git", StorageName: "storage3"}},
+ }
+
+ p, err := NewPipeline(testhelper.SharedLogger(t))
+ require.NoError(t, err)
+
+ ctx := testhelper.Context(t)
+ for _, v := range repos {
+ for _, repo := range v {
+ p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo}))
+ }
+ }
+
+ processedRepos, err := p.Done()
+ require.NoError(t, err)
+ require.EqualValues(t, repos, processedRepos)
+}
diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go
index 4ad653490..be7e75cdb 100644
--- a/internal/cli/gitalybackup/create.go
+++ b/internal/cli/gitalybackup/create.go
@@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i
}))
}
- if err := pipeline.Done(); err != nil {
+ if _, err := pipeline.Done(); err != nil {
return fmt.Errorf("create: %w", err)
}
return nil
diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go
index de9e2cd3d..7a304f2ed 100644
--- a/internal/cli/gitalybackup/restore.go
+++ b/internal/cli/gitalybackup/restore.go
@@ -157,6 +157,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
decoder := json.NewDecoder(stdin)
for {
+
var req restoreRequest
if err := decoder.Decode(&req); errors.Is(err, io.EOF) {
break
@@ -178,7 +179,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin
}))
}
- if err := pipeline.Done(); err != nil {
+ if _, err := pipeline.Done(); err != nil {
return fmt.Errorf("restore: %w", err)
}
return nil