Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-07-30 10:43:08 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-07-30 10:43:08 +0300
commita8520a1568f0c0515eef6931c01b3fa8e55e7985 (patch)
tree202c5a4d8cd7fd263de37e415134b35d3ae6a478
parent9b1bdfc5d16fe86d42a7ccde30a29d6a71267537 (diff)
parentdcc71730c2422d5a949979e2517a3207460d5582 (diff)
Merge branch 'smh-remove-importer' into 'master'
Remove the repository importer See merge request gitlab-org/gitaly!3721
-rw-r--r--cmd/praefect/main.go29
-rw-r--r--internal/praefect/importer/importer.go216
-rw-r--r--internal/praefect/importer/importer_test.go291
3 files changed, 0 insertions, 536 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index d7b7fb4b5..234d475c1 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -79,7 +79,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/importer"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes/tracker"
@@ -417,34 +416,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
})
}
- if db != nil && nodeManager != nil {
- go func() {
- virtualStorages := conf.VirtualStorageNames()
- finished := make(map[string]bool, len(virtualStorages))
- for _, virtualStorage := range virtualStorages {
- finished[virtualStorage] = true
- }
-
- for result := range importer.New(nodeManager, virtualStorages, db).Run(ctx) {
- if result.Error != nil {
- logger.WithFields(logrus.Fields{
- "virtual_storage": result.VirtualStorage,
- logrus.ErrorKey: result.Error,
- }).Error("importing repositories to database failed")
- finished[result.VirtualStorage] = false
- continue
- }
-
- logger.WithFields(logrus.Fields{
- "virtual_storage": result.VirtualStorage,
- "relative_paths": result.RelativePaths,
- }).Info("imported repositories to database")
- }
-
- logger.WithField("virtual_storages", finished).Info("repository importer finished")
- }()
- }
-
if err := b.Start(); err != nil {
return fmt.Errorf("unable to start the bootstrap: %v", err)
}
diff --git a/internal/praefect/importer/importer.go b/internal/praefect/importer/importer.go
deleted file mode 100644
index 7da752970..000000000
--- a/internal/praefect/importer/importer.go
+++ /dev/null
@@ -1,216 +0,0 @@
-package importer
-
-import (
- "context"
- "database/sql"
- "errors"
- "fmt"
- "io"
- "sync"
- "time"
-
- "github.com/lib/pq"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
- "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
-)
-
-const batchSize = 25
-
-// Importer creates database records for repositories which are missing from the database
-// but are present on the virtual storage's primary's disk.
-type Importer struct {
- nm nodes.Manager
- virtualStorages []string
- db glsql.Querier
-}
-
-// New creates a new Importer.
-func New(nm nodes.Manager, virtualStorages []string, db glsql.Querier) *Importer {
- return &Importer{
- nm: nm,
- virtualStorages: virtualStorages,
- db: db,
- }
-}
-
-// Result is a partial result of the import. VirtualStorage is set in each Result,
-// along with either Error or RelativePaths.
-type Result struct {
- // Error is set if the import was aborted by an error.
- Error error
- // VirtualStorage indicates which virtual storage this result relates to.
- VirtualStorage string
- // RelativePaths includes the relative paths of repositories successfully imported
- // in a batch.
- RelativePaths []string
-}
-
-// Run walks the repositories on primary nodes of each virtual storage and creates database records for every
-// repository on the primary's disk that is missing from the database. Run only performs the import for virtual
-// storages that have not had the import successfully completed before. The returned channel must be consumed in
-// order to release the goroutines created by Run.
-func (imp *Importer) Run(ctx context.Context) <-chan Result {
- var wg sync.WaitGroup
- wg.Add(len(imp.virtualStorages))
-
- output := make(chan Result)
- for _, virtualStorage := range imp.virtualStorages {
- go func(virtualStorage string) {
- defer wg.Done()
- if err := imp.importVirtualStorage(ctx, virtualStorage, output); err != nil {
- output <- Result{
- VirtualStorage: virtualStorage,
- Error: fmt.Errorf("importing virtual storage: %w", err),
- }
- }
- }(virtualStorage)
- }
-
- go func() {
- wg.Wait()
- close(output)
- }()
-
- return output
-}
-
-// importVirtualStorage walks the virtual storage's primary's disk and creates database records for any repositories
-// which are missing from the primary.
-func (imp *Importer) importVirtualStorage(ctx context.Context, virtualStorage string, output chan<- Result) error {
- if migrated, err := imp.isAlreadyCompleted(ctx, virtualStorage); err != nil {
- return fmt.Errorf("check if already completed: %w", err)
- } else if migrated {
- return nil
- }
-
- shard, err := imp.nm.GetShard(ctx, virtualStorage)
- if err != nil {
- return fmt.Errorf("get shard: %w", err)
- }
-
- client := gitalypb.NewInternalGitalyClient(shard.Primary.GetConnection())
- stream, err := client.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: shard.Primary.GetStorage()})
- if err != nil {
- return fmt.Errorf("open stream: %w", err)
- }
-
- relativePaths := make([]string, 0, batchSize)
- for {
- // The importer sleeps here for a short duration as a crude way to rate limit
- // to reduce the pressure on the available resources.
- // 100 milliseconds gives us maximum rate of 250 imported repositories per second in
- // 10 database calls.
- time.Sleep(100 * time.Millisecond)
-
- resp, err := stream.Recv()
- if err != nil {
- if errors.Is(err, io.EOF) {
- break
- }
-
- return fmt.Errorf("receive: %w", err)
- }
-
- relativePaths = append(relativePaths, resp.RelativePath)
- if len(relativePaths) == batchSize {
- if err := imp.storeBatch(ctx, virtualStorage, shard.Primary.GetStorage(), relativePaths, output); err != nil {
- return fmt.Errorf("store batch: %w", err)
- }
-
- relativePaths = relativePaths[:0]
- }
- }
-
- // store the final batch after finishing walking repositories
- if len(relativePaths) > 0 {
- if err := imp.storeBatch(ctx, virtualStorage, shard.Primary.GetStorage(), relativePaths, output); err != nil {
- return fmt.Errorf("store final batch: %w", err)
- }
- }
-
- if err := imp.markCompleted(ctx, virtualStorage); err != nil {
- return fmt.Errorf("mark completed: %w", err)
- }
-
- return nil
-}
-
-// isAlreadyCompleted checks if the import has already been run successfully to finish. If so,
-// the import is skipped.
-func (imp *Importer) isAlreadyCompleted(ctx context.Context, virtualStorage string) (bool, error) {
- var alreadyMigrated bool
- if err := imp.db.QueryRowContext(ctx, `
-SELECT repositories_imported
-FROM virtual_storages
-WHERE virtual_storage = $1
- `, virtualStorage).Scan(&alreadyMigrated); err != nil {
- if !errors.Is(err, sql.ErrNoRows) {
- return false, fmt.Errorf("scan: %w", err)
- }
-
- return false, nil
- }
-
- return alreadyMigrated, nil
-}
-
-// markCompleted marks the virtual storage's repository import as completed so it won't be attempted
-// again after successful completion.
-func (imp *Importer) markCompleted(ctx context.Context, virtualStorage string) error {
- _, err := imp.db.ExecContext(ctx, `
-INSERT INTO virtual_storages (virtual_storage, repositories_imported)
-VALUES ($1, true)
-ON CONFLICT (virtual_storage)
- DO UPDATE SET repositories_imported = true
- `, virtualStorage)
- return err
-}
-
-// storeBatch stores a batch of relative paths found on the primary in to the database. Records are only added
-// if there is no existing record of the database in the `repositories` table.
-func (imp *Importer) storeBatch(ctx context.Context, virtualStorage, primary string, relativePaths []string, output chan<- Result) error {
- rows, err := imp.db.QueryContext(ctx, `
-WITH imported_repositories AS (
- INSERT INTO repositories (virtual_storage, relative_path, generation)
- SELECT $1 AS virtual_storage, unnest($2::text[]) AS relative_path, 0 AS generation
- ON CONFLICT DO NOTHING
- RETURNING virtual_storage, relative_path, generation
-), primary_records AS (
- INSERT INTO storage_repositories (virtual_storage, relative_path, storage, generation)
- SELECT virtual_storage, relative_path, $3 AS storage, generation
- FROM imported_repositories
- ON CONFLICT DO NOTHING
- RETURNING relative_path
-)
-
-SELECT relative_path
-FROM primary_records`, virtualStorage, pq.StringArray(relativePaths), primary)
- if err != nil {
- return fmt.Errorf("query: %w", err)
- }
- defer rows.Close()
-
- imported := make([]string, 0, len(relativePaths))
- for rows.Next() {
- var relativePath string
- if err := rows.Scan(&relativePath); err != nil {
- return fmt.Errorf("scan: %w", err)
- }
-
- imported = append(imported, relativePath)
- }
-
- if err := rows.Err(); err != nil {
- return fmt.Errorf("iterating rows: %w", err)
- }
-
- if len(imported) > 0 {
- output <- Result{
- VirtualStorage: virtualStorage,
- RelativePaths: imported,
- }
- }
-
- return nil
-}
diff --git a/internal/praefect/importer/importer_test.go b/internal/praefect/importer/importer_test.go
deleted file mode 100644
index 394e8e719..000000000
--- a/internal/praefect/importer/importer_test.go
+++ /dev/null
@@ -1,291 +0,0 @@
-// +build postgres
-
-package importer
-
-import (
- "fmt"
- "io/ioutil"
- "net"
- "os"
- "path/filepath"
- "sort"
- "testing"
-
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v14/client"
- "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
- "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/internalgitaly"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
- "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
- "google.golang.org/grpc"
-)
-
-func TestRepositoryImporter_Run(t *testing.T) {
- defer glsql.Clean()
-
- for _, tc := range []struct {
- desc string
- existingRecords map[string][]string
- alreadyCompleted map[string]bool
- storages map[string]map[string][]string
- expectedErrors map[string]string
- imported map[string][]string
- }{
- {
- desc: "empty",
- storages: map[string]map[string][]string{
- "virtual-storage": {"primary": {}},
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{},
- },
- {
- desc: "single repo imported",
- storages: map[string]map[string][]string{
- "virtual-storage": {
- "primary": {"repository-1"},
- },
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{
- "virtual-storage": {"repository-1"},
- },
- },
- {
- desc: "nested directories not imported",
- storages: map[string]map[string][]string{
- "virtual-storage": {
- "primary": {"parent-repository", filepath.Join("parent-repository", "nested-repository")},
- },
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{
- "virtual-storage": {"parent-repository"},
- },
- },
- {
- desc: "multi folder hierarchies imported",
- storages: map[string]map[string][]string{
- "virtual-storage": {
- "primary": {filepath.Join("empty-parent-folder", "repository-1")},
- },
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{
- "virtual-storage": {filepath.Join("empty-parent-folder", "repository-1")},
- },
- },
- {
- desc: "multiple virtual storages imported",
- alreadyCompleted: map[string]bool{
- "virtual-storage-2": false,
- },
- storages: map[string]map[string][]string{
- "virtual-storage-1": {"primary": {"repository-1"}},
- "virtual-storage-2": {"primary": {"repository-2"}},
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{
- "virtual-storage-1": {"repository-1"},
- "virtual-storage-2": {"repository-2"},
- },
- },
- {
- desc: "secondaries ignored",
- storages: map[string]map[string][]string{
- "virtual-storage": {
- "primary": {"repository-1"},
- "secondary": {"repository-2"},
- },
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{
- "virtual-storage": {"repository-1"},
- },
- },
- {
- desc: "storages bigger than batch size work",
- storages: map[string]map[string][]string{
- "virtual-storage": {
- "primary": func() []string {
- repos := make([]string, 2*batchSize+1)
- for i := range repos {
- repos[i] = fmt.Sprintf("repository-%d", i)
- }
- return repos
- }(),
- },
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{
- "virtual-storage": func() []string {
- repos := make([]string, 2*batchSize+1)
- for i := range repos {
- repos[i] = fmt.Sprintf("repository-%d", i)
- }
- sort.Strings(repos)
- return repos
- }(),
- },
- },
- {
- desc: "importing skipped when already perfomed",
- alreadyCompleted: map[string]bool{"virtual-storage": true},
- storages: map[string]map[string][]string{
- "virtual-storage": {
- "primary": {"unimported-repository"},
- },
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{},
- },
- {
- desc: "errors dont cancel jobs for other virtual storages",
- storages: map[string]map[string][]string{
- "erroring-virtual-storage": {
- "primary": {"repository-1"},
- },
- "successful-virtual-storage": {
- "primary": {"repository-2"},
- },
- },
- expectedErrors: map[string]string{
- "erroring-virtual-storage": fmt.Sprintf("importing virtual storage: get shard: %v", assert.AnError),
- },
- imported: map[string][]string{
- "successful-virtual-storage": {"repository-2"},
- },
- },
- {
- desc: "repositories with existing records are ignored",
- existingRecords: map[string][]string{"virtual-storage": {"already-existing"}},
- storages: map[string]map[string][]string{
- "virtual-storage": {"primary": {"already-existing", "imported"}},
- },
- expectedErrors: map[string]string{},
- imported: map[string][]string{
- "virtual-storage": {"imported"},
- },
- },
- } {
- t.Run(tc.desc, func(t *testing.T) {
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- db := glsql.GetDB(t, "importer")
-
- srv := grpc.NewServer()
- defer srv.Stop()
-
- tmp, err := ioutil.TempDir("", "praefect-importer")
- require.NoError(t, err)
- defer os.RemoveAll(tmp)
-
- for virtualStorage, completed := range tc.alreadyCompleted {
- _, err := db.ExecContext(ctx, `
- INSERT INTO virtual_storages (virtual_storage, repositories_imported)
- VALUES ($1, $2)
- `, virtualStorage, completed)
- require.NoError(t, err)
- }
-
- var configuredStorages []config.Storage
-
- // storage names are prefixed with the virtual storage to reuse the single gitaly server
- // without directory name collisions
- storageName := func(virtualStorage, storage string) string {
- return fmt.Sprintf("%s-%s", virtualStorage, storage)
- }
-
- // create the repositories on the storages
- for virtualStorage, storages := range tc.storages {
- for storage, relativePaths := range storages {
- storagePath := filepath.Join(tmp, virtualStorage, storage)
- for _, relativePath := range relativePaths {
- repoPath := filepath.Join(storagePath, relativePath)
- require.NoError(t, os.MkdirAll(repoPath, os.ModePerm))
- // WalkFiles checks these files for determining git repositories, we create them
- // here instead of creating a full repo
- for _, filePath := range []string{"objects", "refs", "HEAD"} {
- require.NoError(t, os.Mkdir(filepath.Join(repoPath, filePath), os.ModePerm))
- }
- }
-
- configuredStorages = append(configuredStorages, config.Storage{
- Name: storageName(virtualStorage, storage),
- Path: storagePath,
- })
- }
- }
-
- gitalypb.RegisterInternalGitalyServer(srv, internalgitaly.NewServer(configuredStorages))
-
- socketPath := filepath.Join(tmp, "socket")
-
- ln, err := net.Listen("unix", socketPath)
- require.NoError(t, err)
- defer ln.Close()
-
- go srv.Serve(ln)
-
- conn, err := client.Dial("unix://"+socketPath, nil)
- require.NoError(t, err)
-
- virtualStorages := make([]string, 0, len(tc.storages))
- for vs := range tc.storages {
- virtualStorages = append(virtualStorages, vs)
- }
-
- rs := datastore.NewPostgresRepositoryStore(db, nil)
- for virtualStorage, relativePaths := range tc.existingRecords {
- for _, relativePath := range relativePaths {
- require.NoError(t, rs.SetGeneration(ctx, virtualStorage, relativePath, "any-storage", 0))
- }
- }
-
- importer := New(&nodes.MockManager{
- GetShardFunc: func(virtualStorage string) (nodes.Shard, error) {
- if msg := tc.expectedErrors[virtualStorage]; msg != "" {
- return nodes.Shard{}, assert.AnError
- }
-
- return nodes.Shard{
- Primary: &nodes.MockNode{
- GetStorageMethod: func() string { return storageName(virtualStorage, "primary") },
- Conn: conn,
- },
- }, nil
- },
- }, virtualStorages, db)
-
- actualErrors := map[string]string{}
- imported := map[string][]string{}
- for result := range importer.Run(ctx) {
- if result.Error != nil {
- actualErrors[result.VirtualStorage] = result.Error.Error()
- continue
- }
-
- imported[result.VirtualStorage] = append(imported[result.VirtualStorage], result.RelativePaths...)
- }
-
- require.Equal(t, tc.expectedErrors, actualErrors)
- require.Equal(t, tc.imported, imported)
-
- for virtualStorage := range tc.storages {
- expectedCompleted := true
- if _, ok := tc.expectedErrors[virtualStorage]; ok {
- expectedCompleted = false
- }
-
- actualCompleted, err := importer.isAlreadyCompleted(ctx, virtualStorage)
- require.NoError(t, err)
- require.Equal(t, expectedCompleted, actualCompleted, virtualStorage)
- }
- })
- }
-}