Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Chandler <wchandler@gitlab.com>2022-07-28 21:38:48 +0300
committerWill Chandler <wchandler@gitlab.com>2022-08-31 23:23:22 +0300
commit69420b517a18582cfefeab4297c0ac2456b96de8 (patch)
tree571a2da1d778f64536102d785befa8ff859d009e
parentf01d4c36519c3249c7fdfc9973acc618e6f4940f (diff)
praefect: Add 'track-repositories' subcommandwc-add-many-repos
A somewhat common way to migrate to Gitaly Cluster it to copy the contents of an existing storage onto one node of the cluster and manually track each repository. This is convenient for instances where the migration must happen as a single 'big bang' with some downtime, vs the zero-downtime, but much slower method of migrating via the API. Currently admins can script the `track-repository` to run in a loop when ingesting large numbers of repositories, but this is inconvenient and error-prone. To better support this use case, a way to import repositories in-bulk into Praefect is needed. This commit add a new `track-repositories` subcommand that takes a JSON file as input and adds each entry in a single execution. Changelog: added
-rw-r--r--cmd/praefect/subcmd.go1
-rw-r--r--cmd/praefect/subcmd_track_repositories.go214
-rw-r--r--cmd/praefect/subcmd_track_repositories_test.go357
3 files changed, 572 insertions, 0 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 78b3bcca4..53df26eee 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -42,6 +42,7 @@ var subcommands = map[string]subcmd{
setReplicationFactorCmdName: newSetReplicatioFactorSubcommand(os.Stdout),
removeRepositoryCmdName: newRemoveRepository(logger, os.Stdout),
trackRepositoryCmdName: newTrackRepository(logger, os.Stdout),
+ trackRepositoriesCmdName: newTrackRepositories(logger, os.Stdout),
listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
checkCmdName: newCheckSubcommand(os.Stdout, service.AllChecks()...),
metadataCmdName: newMetadataSubcommand(os.Stdout),
diff --git a/cmd/praefect/subcmd_track_repositories.go b/cmd/praefect/subcmd_track_repositories.go
new file mode 100644
index 000000000..44bdb8c71
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repositories.go
@@ -0,0 +1,214 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "os"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/labkit/correlation"
+)
+
+const (
+ trackRepositoriesCmdName = "track-repositories"
+ paramInputPath = "input-path"
+)
+
+type invalidRequest struct {
+ reqNum int
+ path string
+ errs []error
+}
+
+type dupPathError struct {
+ path string
+ reqNums []int
+}
+
+func (d *dupPathError) Error() string {
+ return fmt.Sprintf("duplicate entries for relative_path, item #: %v", d.reqNums)
+}
+
+type trackRepositories struct {
+ w io.Writer
+ logger logrus.FieldLogger
+ inputPath string
+ replicateImmediately bool
+}
+
+func newTrackRepositories(logger logrus.FieldLogger, w io.Writer) *trackRepositories {
+ return &trackRepositories{w: w, logger: logger}
+}
+
+func (cmd *trackRepositories) FlagSet() *flag.FlagSet {
+ fs := flag.NewFlagSet(trackRepositoryCmdName, flag.ExitOnError)
+ fs.BoolVar(&cmd.replicateImmediately, "replicate-immediately", false, "kick off a replication immediately")
+ fs.StringVar(&cmd.inputPath, paramInputPath, "", "path to file with details of repositories to track")
+ fs.Usage = func() {
+ printfErr("Description:\n" +
+ " This command allows bulk requests for repositories to be tracked by Praefect.\n" +
+ " The -input-path flag must be the path of a file containing the details of the repositories\n" +
+ " to track as a list of newline-delimited JSON objects. Each entry must contain the\n" +
+ " following keys:\n\n" +
+ " relative_path - The relative path of the repository on-disk.\n" +
+ " virtual_storage - The Praefect virtual storage name.\n" +
+ " authoritative_storage - Which storage to consider as the canonical copy of the repository.\n\n" +
+ " If -replicate-immediately is used, the command will attempt to replicate the repositories\n" +
+ " to the secondaries. Otherwise, replication jobs will be created and will be executed\n" +
+ " eventually by Praefect itself.\n")
+ fs.PrintDefaults()
+ }
+ return fs
+}
+
+func (cmd trackRepositories) Exec(flags *flag.FlagSet, cfg config.Config) error {
+ switch {
+ case flags.NArg() > 0:
+ return unexpectedPositionalArgsError{Command: flags.Name()}
+ }
+
+ ctx := correlation.ContextWithCorrelation(context.Background(), correlation.SafeRandomID())
+ logger = cmd.logger.WithField("correlation_id", correlation.ExtractFromContext(ctx))
+
+ openDBCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+ db, err := glsql.OpenDB(openDBCtx, cfg.DB)
+ if err != nil {
+ return fmt.Errorf("connect to database: %w", err)
+ }
+ defer func() { _ = db.Close() }()
+ store := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+
+ f, err := os.Open(cmd.inputPath)
+ if err != nil {
+ return fmt.Errorf("open input: %w", err)
+ }
+ defer f.Close()
+
+ d := json.NewDecoder(f)
+ d.DisallowUnknownFields()
+
+ fmt.Fprintf(cmd.w, "Validating repository information in %q\n", cmd.inputPath)
+
+ var requests []trackRepositoryRequest
+ var repoNum int
+ var repoErrs []invalidRequest
+ pathLines := make(map[string][]int)
+
+ // Read in and validate all requests from input file before executing. This prevents us from
+ // partially executing a file, which makes it difficult to tell which repos were actually
+ // tracked.
+ for d.More() {
+ repoNum++
+
+ request := trackRepositoryRequest{}
+ badReq := invalidRequest{reqNum: repoNum}
+
+ if err := d.Decode(&request); err != nil {
+ badReq.errs = append(badReq.errs, err)
+ repoErrs = append(repoErrs, badReq)
+
+ // Invalid request, nothing to validate.
+ continue
+ }
+
+ if request.RelativePath == "" {
+ badReq.errs = append(badReq.errs, requiredParameterError(paramRelativePath))
+ }
+ badReq.path = request.RelativePath
+
+ if request.VirtualStorage == "" {
+ badReq.errs = append(badReq.errs, requiredParameterError(paramVirtualStorage))
+ }
+ if request.AuthoritativeStorage == "" {
+ badReq.errs = append(badReq.errs, requiredParameterError(paramAuthoritativeStorage))
+ }
+ if len(badReq.errs) > 0 {
+ repoErrs = append(repoErrs, badReq)
+
+ // Incomplete request, no further validation possible.
+ continue
+ }
+
+ // Repo paths are globally unique, any attempt to add the same path multiple virtual storages
+ // is invalid and must be rejected.
+ prevLines, exists := pathLines[request.RelativePath]
+ if exists {
+ badReq.errs = append(badReq.errs, &dupPathError{path: request.RelativePath})
+ repoErrs = append(repoErrs, badReq)
+
+ prevLines = append(prevLines, repoNum)
+ pathLines[request.RelativePath] = prevLines
+
+ // We've already checked this path, no need to run further checks.
+ continue
+ }
+ pathLines[request.RelativePath] = []int{repoNum}
+
+ repoInDB, err := store.RepositoryExists(ctx, request.VirtualStorage, request.RelativePath)
+ if err != nil {
+ // Bail out if we're having trouble contacting the DB, nothing is going to work if this fails.
+ return fmt.Errorf("checking database: %w", err)
+ }
+ if repoInDB {
+ badReq.errs = append(badReq.errs, fmt.Errorf("repository is already tracked by Praefect"))
+ repoErrs = append(repoErrs, badReq)
+ // Repo already in Praefect DB, we can skip it.
+ continue
+ }
+
+ authoritativeRepoExists, err := request.authoritativeRepositoryExists(ctx, cfg, cmd.w, request.AuthoritativeStorage)
+ if err != nil {
+ badReq.errs = append(badReq.errs, fmt.Errorf("checking repository on disk: %w", err))
+ } else if !authoritativeRepoExists {
+ badReq.errs = append(badReq.errs, fmt.Errorf("not a valid git repository"))
+ }
+
+ if len(badReq.errs) > 0 {
+ repoErrs = append(repoErrs, badReq)
+ continue
+ }
+ requests = append(requests, request)
+ }
+
+ if len(repoErrs) > 0 {
+ printInvalidRequests(cmd.w, repoErrs, pathLines, cmd.inputPath)
+ return fmt.Errorf("invalid entries found, aborting")
+ }
+ if len(requests) == 0 {
+ return fmt.Errorf("no repository information found in %q", cmd.inputPath)
+ }
+
+ fmt.Fprintf(cmd.w, "All repository details are correctly formatted\n")
+ fmt.Fprintf(cmd.w, "Tracking %v repositories in Praefect DB...\n", repoNum)
+ for _, request := range requests {
+ if err := request.execRequest(ctx, db, cfg, cmd.w, logger, cmd.replicateImmediately); err != nil {
+ return fmt.Errorf("tracking repository %q: %w", request.RelativePath, err)
+ }
+ }
+
+ return nil
+}
+
+func printInvalidRequests(w io.Writer, repoErrs []invalidRequest, pathLines map[string][]int, inputPath string) {
+ fmt.Fprintf(w, "Found %v invalid request(s) in %q:\n", len(repoErrs), inputPath)
+
+ for _, l := range repoErrs {
+ fmt.Fprintf(w, " item #: %v, relative_path: %q\n", l.reqNum, l.path)
+ for _, err := range l.errs {
+ if dup, ok := err.(*dupPathError); ok {
+ // The complete set of duplicate reqNums won't be known until input is
+ // fully processed, fetch them now.
+ err = &dupPathError{path: dup.path, reqNums: pathLines[dup.path]}
+ }
+ fmt.Fprintf(w, " %v\n", err)
+ }
+ }
+}
diff --git a/cmd/praefect/subcmd_track_repositories_test.go b/cmd/praefect/subcmd_track_repositories_test.go
new file mode 100644
index 000000000..cd8e97a1b
--- /dev/null
+++ b/cmd/praefect/subcmd_track_repositories_test.go
@@ -0,0 +1,357 @@
+//go:build !gitaly_test_sha256
+
+package main
+
+import (
+ "bytes"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+)
+
+func TestAddRepositories_FlagSet(t *testing.T) {
+ t.Parallel()
+ cmd := &trackRepositories{}
+ fs := cmd.FlagSet()
+ require.NoError(t, fs.Parse([]string{"--input-path", "/dev/stdin", "--replicate-immediately", "true"}))
+ require.Equal(t, "/dev/stdin", cmd.inputPath)
+ require.Equal(t, true, cmd.replicateImmediately)
+}
+
+func TestAddRepositories_Exec_invalidInput(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+
+ g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ defer g2Srv.Shutdown()
+ defer g1Srv.Shutdown()
+ g1Addr := g1Srv.Address()
+
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
+
+ virtualStorageName := "praefect"
+ conf := config.Config{
+ AllowLegacyElectors: true,
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorageName,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ DefaultReplicationFactor: 2,
+ },
+ },
+ DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ }
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
+ ctx := testhelper.Context(t)
+ inputFile := "input_file"
+
+ trackRepo := func(relativePath string) error {
+ repositoryID, err := rs.ReserveRepositoryID(ctx, virtualStorageName, relativePath)
+ if err != nil {
+ return err
+ }
+ return rs.CreateRepository(
+ ctx,
+ repositoryID,
+ virtualStorageName,
+ relativePath,
+ relativePath,
+ g1Cfg.Storages[0].Name,
+ nil,
+ nil,
+ false,
+ false,
+ )
+ }
+
+ invalidEntryErr := "invalid entries found, aborting"
+
+ testCases := []struct {
+ input string
+ desc string
+ expectedOutput string
+ expectedError string
+ trackedPath string
+ }{
+ {
+ input: "",
+ desc: "empty input",
+ expectedError: "no repository information found",
+ },
+ {
+ input: `{"foo":"bar"}`,
+ desc: "unexpected key in JSON",
+ expectedOutput: `json: unknown field "foo"`,
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`,
+ desc: "missing path",
+ expectedOutput: `"repository" is a required parameter`,
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"virtual_storage":"foo","relative_path":"bar","authoritative_storage":"gitaly-1"}`,
+ desc: "invalid virtual storage",
+ expectedOutput: `virtual storage "foo" not found`,
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"relative_path":"not_a_repo","virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`,
+ desc: "repo does not exist",
+ expectedOutput: "not a valid git repository",
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"virtual_storage":"praefect","relative_path":"duplicate","authoritative_storage":"gitaly-1"}
+{"virtual_storage":"praefect","relative_path":"duplicate","authoritative_storage":"gitaly-1"}`,
+ desc: "duplicate path",
+ expectedOutput: "duplicate entries for relative_path",
+ expectedError: invalidEntryErr,
+ },
+ {
+ input: `{"relative_path":"already_tracked","virtual_storage":"praefect","authoritative_storage":"gitaly-1"}`,
+ desc: "repo is already tracked",
+ expectedOutput: "repository is already tracked by Praefect",
+ expectedError: invalidEntryErr,
+ trackedPath: "already_tracked",
+ },
+ }
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(
+ testhelper.NewDiscardingLogEntry(t),
+ conf,
+ db.DB,
+ nil,
+ promtest.NewMockHistogramVec(),
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ tempDir := testhelper.TempDir(t)
+ f, err := os.Create(filepath.Join(tempDir, inputFile))
+ require.NoError(t, err)
+ _, err = f.Write([]byte(tc.input))
+ require.NoError(t, err)
+ require.NoError(t, f.Close())
+
+ var stdout bytes.Buffer
+
+ if tc.trackedPath != "" {
+ require.NoError(t, trackRepo(tc.trackedPath))
+ }
+
+ addReposCmd := &trackRepositories{
+ inputPath: filepath.Join(tempDir, inputFile),
+ replicateImmediately: true,
+ logger: logger,
+ w: &stdout,
+ }
+ err = addReposCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf)
+ require.Error(t, err)
+
+ if tc.expectedOutput != "" {
+ require.Contains(t, stdout.String(), tc.expectedOutput)
+ }
+ require.Contains(t, err.Error(), tc.expectedError)
+ })
+ }
+}
+
+func TestAddRepositories_Exec(t *testing.T) {
+ t.Parallel()
+ g1Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-1"))
+ g2Cfg := testcfg.Build(t, testcfg.WithStorages("gitaly-2"))
+ testcfg.BuildGitalyHooks(t, g2Cfg)
+ testcfg.BuildGitalySSH(t, g2Cfg)
+
+ g1Srv := testserver.StartGitalyServer(t, g1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ g2Srv := testserver.StartGitalyServer(t, g2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+ defer g2Srv.Shutdown()
+ defer g1Srv.Shutdown()
+
+ g1Addr := g1Srv.Address()
+
+ db := testdb.New(t)
+ dbConf := testdb.GetConfig(t, db.Name)
+
+ virtualStorageName := "praefect"
+ conf := config.Config{
+ AllowLegacyElectors: true,
+ SocketPath: testhelper.GetTemporaryGitalySocketFileName(t),
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorageName,
+ Nodes: []*config.Node{
+ {Storage: g1Cfg.Storages[0].Name, Address: g1Addr},
+ {Storage: g2Cfg.Storages[0].Name, Address: g2Srv.Address()},
+ },
+ DefaultReplicationFactor: 2,
+ },
+ },
+ DB: dbConf,
+ Failover: config.Failover{
+ Enabled: true,
+ ElectionStrategy: config.ElectionStrategyPerRepository,
+ },
+ }
+
+ gitalyCC, err := client.Dial(g1Addr, nil)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, gitalyCC.Close()) }()
+ ctx := testhelper.Context(t)
+
+ gitaly1RepositoryClient := gitalypb.NewRepositoryServiceClient(gitalyCC)
+
+ createRepoThroughGitaly1 := func(relativePath string) error {
+ _, err := gitaly1RepositoryClient.CreateRepository(
+ ctx,
+ &gitalypb.CreateRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: g1Cfg.Storages[0].Name,
+ RelativePath: relativePath,
+ },
+ })
+ return err
+ }
+
+ authoritativeStorage := g1Cfg.Storages[0].Name
+ logger := testhelper.NewDiscardingLogger(t)
+
+ t.Run("ok", func(t *testing.T) {
+ testCases := []struct {
+ relativePaths []string
+ desc string
+ replicateImmediately bool
+ expectedOutput string
+ }{
+ {
+ relativePaths: []string{"path/to/test/repo1", "path/to/test/repo2"},
+ desc: "immediate replication",
+ replicateImmediately: true,
+ expectedOutput: "Finished replicating repository to \"gitaly-2\".\n",
+ },
+ {
+ relativePaths: []string{"path/to/test/repo3", "path/to/test/repo4"},
+ desc: "no immediate replication",
+ replicateImmediately: false,
+ expectedOutput: "Added replication job to replicate repository to \"gitaly-2\".\n",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ nodeMgr, err := nodes.NewManager(
+ testhelper.NewDiscardingLogEntry(t),
+ conf,
+ db.DB,
+ nil,
+ promtest.NewMockHistogramVec(),
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ nil,
+ nil,
+ )
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ defer nodeMgr.Stop()
+
+ tempDir := testhelper.TempDir(t)
+ input, err := os.Create(filepath.Join(tempDir, "input"))
+ require.NoError(t, err)
+
+ repoDS := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+ for _, path := range tc.relativePaths {
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, path)
+ require.NoError(t, err)
+ require.False(t, exists)
+
+ // create the repo on Gitaly without Praefect knowing
+ require.NoError(t, createRepoThroughGitaly1(path))
+ require.DirExists(t, filepath.Join(g1Cfg.Storages[0].Path, path))
+ require.NoDirExists(t, filepath.Join(g2Cfg.Storages[0].Path, path))
+
+ // Write repo details to input file
+ repoEntry, err := json.Marshal(trackRepositoryRequest{RelativePath: path, VirtualStorage: virtualStorageName, AuthoritativeStorage: authoritativeStorage})
+ require.NoError(t, err)
+ fmt.Fprintf(input, string(repoEntry)+"\n")
+ }
+
+ var stdout bytes.Buffer
+
+ addRepoCmd := &trackRepositories{
+ inputPath: filepath.Join(tempDir, "input"),
+ replicateImmediately: tc.replicateImmediately,
+ logger: logger,
+ w: &stdout,
+ }
+
+ require.NoError(t, addRepoCmd.Exec(flag.NewFlagSet("", flag.PanicOnError), conf))
+ assert.Contains(t, stdout.String(), tc.expectedOutput)
+
+ as := datastore.NewAssignmentStore(db, conf.StorageNames())
+
+ for _, path := range tc.relativePaths {
+ repositoryID, err := repoDS.GetRepositoryID(ctx, virtualStorageName, path)
+ require.NoError(t, err)
+
+ assignments, err := as.GetHostAssignments(ctx, virtualStorageName, repositoryID)
+ require.NoError(t, err)
+ require.Len(t, assignments, 2)
+ assert.Contains(t, assignments, g1Cfg.Storages[0].Name)
+ assert.Contains(t, assignments, g2Cfg.Storages[0].Name)
+
+ exists, err := repoDS.RepositoryExists(ctx, virtualStorageName, path)
+ require.NoError(t, err)
+ assert.True(t, exists)
+
+ if !tc.replicateImmediately {
+ queue := datastore.NewPostgresReplicationEventQueue(db)
+ events, err := queue.Dequeue(ctx, virtualStorageName, g2Cfg.Storages[0].Name, 1)
+ require.NoError(t, err)
+ assert.Len(t, events, 1)
+ assert.Equal(t, path, events[0].Job.RelativePath)
+ } else {
+ require.DirExists(t, filepath.Join(g2Cfg.Storages[0].Path, path))
+ }
+ }
+ })
+ }
+ })
+}