diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-30 10:43:08 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-07-30 10:43:08 +0300 |
commit | a8520a1568f0c0515eef6931c01b3fa8e55e7985 (patch) | |
tree | 202c5a4d8cd7fd263de37e415134b35d3ae6a478 | |
parent | 9b1bdfc5d16fe86d42a7ccde30a29d6a71267537 (diff) | |
parent | dcc71730c2422d5a949979e2517a3207460d5582 (diff) |
Merge branch 'smh-remove-importer' into 'master'
Remove the repository importer
See merge request gitlab-org/gitaly!3721
-rw-r--r-- | cmd/praefect/main.go | 29 | ||||
-rw-r--r-- | internal/praefect/importer/importer.go | 216 | ||||
-rw-r--r-- | internal/praefect/importer/importer_test.go | 291 |
3 files changed, 0 insertions, 536 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index d7b7fb4b5..234d475c1 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -79,7 +79,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/importer" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes/tracker" @@ -417,34 +416,6 @@ func run(cfgs []starter.Config, conf config.Config) error { }) } - if db != nil && nodeManager != nil { - go func() { - virtualStorages := conf.VirtualStorageNames() - finished := make(map[string]bool, len(virtualStorages)) - for _, virtualStorage := range virtualStorages { - finished[virtualStorage] = true - } - - for result := range importer.New(nodeManager, virtualStorages, db).Run(ctx) { - if result.Error != nil { - logger.WithFields(logrus.Fields{ - "virtual_storage": result.VirtualStorage, - logrus.ErrorKey: result.Error, - }).Error("importing repositories to database failed") - finished[result.VirtualStorage] = false - continue - } - - logger.WithFields(logrus.Fields{ - "virtual_storage": result.VirtualStorage, - "relative_paths": result.RelativePaths, - }).Info("imported repositories to database") - } - - logger.WithField("virtual_storages", finished).Info("repository importer finished") - }() - } - if err := b.Start(); err != nil { return fmt.Errorf("unable to start the bootstrap: %v", err) } diff --git a/internal/praefect/importer/importer.go b/internal/praefect/importer/importer.go deleted file mode 100644 index 7da752970..000000000 --- a/internal/praefect/importer/importer.go +++ /dev/null @@ -1,216 +0,0 @@ -package importer - -import ( - "context" - "database/sql" - "errors" - "fmt" - "io" - "sync" - "time" - - "github.com/lib/pq" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" - "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" -) - -const batchSize = 25 - -// Importer creates database records for repositories which are missing from the database -// but are present on the virtual storage's primary's disk. -type Importer struct { - nm nodes.Manager - virtualStorages []string - db glsql.Querier -} - -// New creates a new Importer. -func New(nm nodes.Manager, virtualStorages []string, db glsql.Querier) *Importer { - return &Importer{ - nm: nm, - virtualStorages: virtualStorages, - db: db, - } -} - -// Result is a partial result of the import. VirtualStorage is set in each Result, -// along with either Error or RelativePaths. -type Result struct { - // Error is set if the import was aborted by an error. - Error error - // VirtualStorage indicates which virtual storage this result relates to. - VirtualStorage string - // RelativePaths includes the relative paths of repositories successfully imported - // in a batch. - RelativePaths []string -} - -// Run walks the repositories on primary nodes of each virtual storage and creates database records for every -// repository on the primary's disk that is missing from the database. Run only performs the import for virtual -// storages that have not had the import successfully completed before. The returned channel must be consumed in -// order to release the goroutines created by Run. -func (imp *Importer) Run(ctx context.Context) <-chan Result { - var wg sync.WaitGroup - wg.Add(len(imp.virtualStorages)) - - output := make(chan Result) - for _, virtualStorage := range imp.virtualStorages { - go func(virtualStorage string) { - defer wg.Done() - if err := imp.importVirtualStorage(ctx, virtualStorage, output); err != nil { - output <- Result{ - VirtualStorage: virtualStorage, - Error: fmt.Errorf("importing virtual storage: %w", err), - } - } - }(virtualStorage) - } - - go func() { - wg.Wait() - close(output) - }() - - return output -} - -// importVirtualStorage walks the virtual storage's primary's disk and creates database records for any repositories -// which are missing from the primary. -func (imp *Importer) importVirtualStorage(ctx context.Context, virtualStorage string, output chan<- Result) error { - if migrated, err := imp.isAlreadyCompleted(ctx, virtualStorage); err != nil { - return fmt.Errorf("check if already completed: %w", err) - } else if migrated { - return nil - } - - shard, err := imp.nm.GetShard(ctx, virtualStorage) - if err != nil { - return fmt.Errorf("get shard: %w", err) - } - - client := gitalypb.NewInternalGitalyClient(shard.Primary.GetConnection()) - stream, err := client.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: shard.Primary.GetStorage()}) - if err != nil { - return fmt.Errorf("open stream: %w", err) - } - - relativePaths := make([]string, 0, batchSize) - for { - // The importer sleeps here for a short duration as a crude way to rate limit - // to reduce the pressure on the available resources. - // 100 milliseconds gives us maximum rate of 250 imported repositories per second in - // 10 database calls. - time.Sleep(100 * time.Millisecond) - - resp, err := stream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - break - } - - return fmt.Errorf("receive: %w", err) - } - - relativePaths = append(relativePaths, resp.RelativePath) - if len(relativePaths) == batchSize { - if err := imp.storeBatch(ctx, virtualStorage, shard.Primary.GetStorage(), relativePaths, output); err != nil { - return fmt.Errorf("store batch: %w", err) - } - - relativePaths = relativePaths[:0] - } - } - - // store the final batch after finishing walking repositories - if len(relativePaths) > 0 { - if err := imp.storeBatch(ctx, virtualStorage, shard.Primary.GetStorage(), relativePaths, output); err != nil { - return fmt.Errorf("store final batch: %w", err) - } - } - - if err := imp.markCompleted(ctx, virtualStorage); err != nil { - return fmt.Errorf("mark completed: %w", err) - } - - return nil -} - -// isAlreadyCompleted checks if the import has already been run successfully to finish. If so, -// the import is skipped. -func (imp *Importer) isAlreadyCompleted(ctx context.Context, virtualStorage string) (bool, error) { - var alreadyMigrated bool - if err := imp.db.QueryRowContext(ctx, ` -SELECT repositories_imported -FROM virtual_storages -WHERE virtual_storage = $1 - `, virtualStorage).Scan(&alreadyMigrated); err != nil { - if !errors.Is(err, sql.ErrNoRows) { - return false, fmt.Errorf("scan: %w", err) - } - - return false, nil - } - - return alreadyMigrated, nil -} - -// markCompleted marks the virtual storage's repository import as completed so it won't be attempted -// again after successful completion. -func (imp *Importer) markCompleted(ctx context.Context, virtualStorage string) error { - _, err := imp.db.ExecContext(ctx, ` -INSERT INTO virtual_storages (virtual_storage, repositories_imported) -VALUES ($1, true) -ON CONFLICT (virtual_storage) - DO UPDATE SET repositories_imported = true - `, virtualStorage) - return err -} - -// storeBatch stores a batch of relative paths found on the primary in to the database. Records are only added -// if there is no existing record of the database in the `repositories` table. -func (imp *Importer) storeBatch(ctx context.Context, virtualStorage, primary string, relativePaths []string, output chan<- Result) error { - rows, err := imp.db.QueryContext(ctx, ` -WITH imported_repositories AS ( - INSERT INTO repositories (virtual_storage, relative_path, generation) - SELECT $1 AS virtual_storage, unnest($2::text[]) AS relative_path, 0 AS generation - ON CONFLICT DO NOTHING - RETURNING virtual_storage, relative_path, generation -), primary_records AS ( - INSERT INTO storage_repositories (virtual_storage, relative_path, storage, generation) - SELECT virtual_storage, relative_path, $3 AS storage, generation - FROM imported_repositories - ON CONFLICT DO NOTHING - RETURNING relative_path -) - -SELECT relative_path -FROM primary_records`, virtualStorage, pq.StringArray(relativePaths), primary) - if err != nil { - return fmt.Errorf("query: %w", err) - } - defer rows.Close() - - imported := make([]string, 0, len(relativePaths)) - for rows.Next() { - var relativePath string - if err := rows.Scan(&relativePath); err != nil { - return fmt.Errorf("scan: %w", err) - } - - imported = append(imported, relativePath) - } - - if err := rows.Err(); err != nil { - return fmt.Errorf("iterating rows: %w", err) - } - - if len(imported) > 0 { - output <- Result{ - VirtualStorage: virtualStorage, - RelativePaths: imported, - } - } - - return nil -} diff --git a/internal/praefect/importer/importer_test.go b/internal/praefect/importer/importer_test.go deleted file mode 100644 index 394e8e719..000000000 --- a/internal/praefect/importer/importer_test.go +++ /dev/null @@ -1,291 +0,0 @@ -// +build postgres - -package importer - -import ( - "fmt" - "io/ioutil" - "net" - "os" - "path/filepath" - "sort" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/client" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/internalgitaly" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" - "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" - "google.golang.org/grpc" -) - -func TestRepositoryImporter_Run(t *testing.T) { - defer glsql.Clean() - - for _, tc := range []struct { - desc string - existingRecords map[string][]string - alreadyCompleted map[string]bool - storages map[string]map[string][]string - expectedErrors map[string]string - imported map[string][]string - }{ - { - desc: "empty", - storages: map[string]map[string][]string{ - "virtual-storage": {"primary": {}}, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{}, - }, - { - desc: "single repo imported", - storages: map[string]map[string][]string{ - "virtual-storage": { - "primary": {"repository-1"}, - }, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{ - "virtual-storage": {"repository-1"}, - }, - }, - { - desc: "nested directories not imported", - storages: map[string]map[string][]string{ - "virtual-storage": { - "primary": {"parent-repository", filepath.Join("parent-repository", "nested-repository")}, - }, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{ - "virtual-storage": {"parent-repository"}, - }, - }, - { - desc: "multi folder hierarchies imported", - storages: map[string]map[string][]string{ - "virtual-storage": { - "primary": {filepath.Join("empty-parent-folder", "repository-1")}, - }, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{ - "virtual-storage": {filepath.Join("empty-parent-folder", "repository-1")}, - }, - }, - { - desc: "multiple virtual storages imported", - alreadyCompleted: map[string]bool{ - "virtual-storage-2": false, - }, - storages: map[string]map[string][]string{ - "virtual-storage-1": {"primary": {"repository-1"}}, - "virtual-storage-2": {"primary": {"repository-2"}}, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{ - "virtual-storage-1": {"repository-1"}, - "virtual-storage-2": {"repository-2"}, - }, - }, - { - desc: "secondaries ignored", - storages: map[string]map[string][]string{ - "virtual-storage": { - "primary": {"repository-1"}, - "secondary": {"repository-2"}, - }, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{ - "virtual-storage": {"repository-1"}, - }, - }, - { - desc: "storages bigger than batch size work", - storages: map[string]map[string][]string{ - "virtual-storage": { - "primary": func() []string { - repos := make([]string, 2*batchSize+1) - for i := range repos { - repos[i] = fmt.Sprintf("repository-%d", i) - } - return repos - }(), - }, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{ - "virtual-storage": func() []string { - repos := make([]string, 2*batchSize+1) - for i := range repos { - repos[i] = fmt.Sprintf("repository-%d", i) - } - sort.Strings(repos) - return repos - }(), - }, - }, - { - desc: "importing skipped when already perfomed", - alreadyCompleted: map[string]bool{"virtual-storage": true}, - storages: map[string]map[string][]string{ - "virtual-storage": { - "primary": {"unimported-repository"}, - }, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{}, - }, - { - desc: "errors dont cancel jobs for other virtual storages", - storages: map[string]map[string][]string{ - "erroring-virtual-storage": { - "primary": {"repository-1"}, - }, - "successful-virtual-storage": { - "primary": {"repository-2"}, - }, - }, - expectedErrors: map[string]string{ - "erroring-virtual-storage": fmt.Sprintf("importing virtual storage: get shard: %v", assert.AnError), - }, - imported: map[string][]string{ - "successful-virtual-storage": {"repository-2"}, - }, - }, - { - desc: "repositories with existing records are ignored", - existingRecords: map[string][]string{"virtual-storage": {"already-existing"}}, - storages: map[string]map[string][]string{ - "virtual-storage": {"primary": {"already-existing", "imported"}}, - }, - expectedErrors: map[string]string{}, - imported: map[string][]string{ - "virtual-storage": {"imported"}, - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := testhelper.Context() - defer cancel() - - db := glsql.GetDB(t, "importer") - - srv := grpc.NewServer() - defer srv.Stop() - - tmp, err := ioutil.TempDir("", "praefect-importer") - require.NoError(t, err) - defer os.RemoveAll(tmp) - - for virtualStorage, completed := range tc.alreadyCompleted { - _, err := db.ExecContext(ctx, ` - INSERT INTO virtual_storages (virtual_storage, repositories_imported) - VALUES ($1, $2) - `, virtualStorage, completed) - require.NoError(t, err) - } - - var configuredStorages []config.Storage - - // storage names are prefixed with the virtual storage to reuse the single gitaly server - // without directory name collisions - storageName := func(virtualStorage, storage string) string { - return fmt.Sprintf("%s-%s", virtualStorage, storage) - } - - // create the repositories on the storages - for virtualStorage, storages := range tc.storages { - for storage, relativePaths := range storages { - storagePath := filepath.Join(tmp, virtualStorage, storage) - for _, relativePath := range relativePaths { - repoPath := filepath.Join(storagePath, relativePath) - require.NoError(t, os.MkdirAll(repoPath, os.ModePerm)) - // WalkFiles checks these files for determining git repositories, we create them - // here instead of creating a full repo - for _, filePath := range []string{"objects", "refs", "HEAD"} { - require.NoError(t, os.Mkdir(filepath.Join(repoPath, filePath), os.ModePerm)) - } - } - - configuredStorages = append(configuredStorages, config.Storage{ - Name: storageName(virtualStorage, storage), - Path: storagePath, - }) - } - } - - gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(configuredStorages)) - - socketPath := filepath.Join(tmp, "socket") - - ln, err := net.Listen("unix", socketPath) - require.NoError(t, err) - defer ln.Close() - - go srv.Serve(ln) - - conn, err := client.Dial("unix://"+socketPath, nil) - require.NoError(t, err) - - virtualStorages := make([]string, 0, len(tc.storages)) - for vs := range tc.storages { - virtualStorages = append(virtualStorages, vs) - } - - rs := datastore.NewPostgresRepositoryStore(db, nil) - for virtualStorage, relativePaths := range tc.existingRecords { - for _, relativePath := range relativePaths { - require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, "any-storage", 0)) - } - } - - importer := New(&nodes.MockManager{ - GetShardFunc: func(virtualStorage string) (nodes.Shard, error) { - if msg := tc.expectedErrors[virtualStorage]; msg != "" { - return nodes.Shard{}, assert.AnError - } - - return nodes.Shard{ - Primary: &nodes.MockNode{ - GetStorageMethod: func() string { return storageName(virtualStorage, "primary") }, - Conn: conn, - }, - }, nil - }, - }, virtualStorages, db) - - actualErrors := map[string]string{} - imported := map[string][]string{} - for result := range importer.Run(ctx) { - if result.Error != nil { - actualErrors[result.VirtualStorage] = result.Error.Error() - continue - } - - imported[result.VirtualStorage] = append(imported[result.VirtualStorage], result.RelativePaths...) - } - - require.Equal(t, tc.expectedErrors, actualErrors) - require.Equal(t, tc.imported, imported) - - for virtualStorage := range tc.storages { - expectedCompleted := true - if _, ok := tc.expectedErrors[virtualStorage]; ok { - expectedCompleted = false - } - - actualCompleted, err := importer.isAlreadyCompleted(ctx, virtualStorage) - require.NoError(t, err) - require.Equal(t, expectedCompleted, actualCompleted, virtualStorage) - } - }) - } -} |