diff options
author | John Cai <jcai@gitlab.com> | 2022-03-29 23:24:59 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-03-29 23:24:59 +0300 |
commit | e3996e10e7efd5342a3a6e1c83c562408b0e639d (patch) | |
tree | 8b8dbf2b644773f82b406c664e6e2683bd3b62db | |
parent | 51da8bc17059e4ccd39873e4f3def935341472b8 (diff) | |
parent | 3e0d775e16fbf8919a3a0508faee8da9abc96d7a (diff) |
Merge branch 'jc-dont-call-optimize-repo' into 'master'
maintenance: Call Optimize directly on repository manager
Closes #3990
See merge request gitlab-org/gitaly!4407
-rw-r--r-- | cmd/gitaly/main.go | 11 | ||||
-rw-r--r-- | internal/git/housekeeping/manager.go | 2 | ||||
-rw-r--r-- | internal/git/housekeeping/optimize_repository.go | 48 | ||||
-rw-r--r-- | internal/git/housekeeping/optimize_repository_ext_test.go | 26 | ||||
-rw-r--r-- | internal/git/housekeeping/optimize_repository_test.go | 84 | ||||
-rw-r--r-- | internal/gitaly/maintenance/optimize.go | 114 | ||||
-rw-r--r-- | internal/gitaly/maintenance/optimize_test.go | 31 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory.go | 62 |
8 files changed, 186 insertions, 192 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index a0e231cb1..4bd8e234b 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -20,12 +20,15 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v14/internal/git2go" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/linguist" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/rubyserver" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/server" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" @@ -315,7 +318,13 @@ func run(cfg config.Cfg) error { return fmt.Errorf("unable to start the bootstrap: %v", err) } - shutdownWorkers, err := gitalyServerFactory.StartWorkers(ctx, glog.Default(), cfg) + shutdownWorkers, err := maintenance.StartWorkers( + ctx, + glog.Default(), + maintenance.DailyOptimizationWorker(cfg, maintenance.OptimizerFunc(func(ctx context.Context, repo repository.GitRepo) error { + return housekeepingManager.OptimizeRepository(ctx, localrepo.New(locator, gitCmdFactory, catfileCache, repo)) + })), + ) if err != nil { return fmt.Errorf("initialize auxiliary workers: %v", err) } diff --git a/internal/git/housekeeping/manager.go b/internal/git/housekeeping/manager.go index 552c153c9..c34aa1330 100644 --- a/internal/git/housekeeping/manager.go +++ b/internal/git/housekeeping/manager.go @@ -41,7 +41,7 @@ func NewManager(promCfg gitalycfgprom.Config, txManager transaction.Manager) *Re Name: "gitaly_housekeeping_tasks_total", Help: "Total number of housekeeping tasks performed in the repository", }, - []string{"housekeeping_task"}, + []string{"housekeeping_task", "status"}, ), tasksLatency: prometheus.NewHistogramVec( prometheus.HistogramOpts{ diff --git a/internal/git/housekeeping/optimize_repository.go b/internal/git/housekeeping/optimize_repository.go index 09d923c06..f4fc8419e 100644 --- a/internal/git/housekeeping/optimize_repository.go +++ b/internal/git/housekeeping/optimize_repository.go @@ -41,31 +41,18 @@ func (m *RepositoryManager) OptimizeRepository(ctx context.Context, repo *localr func optimizeRepository(ctx context.Context, m *RepositoryManager, repo *localrepo.Repo) error { totalTimer := prometheus.NewTimer(m.tasksLatency.WithLabelValues("total")) + totalStatus := "failure" - optimizations := struct { - PackedObjectsIncremental bool `json:"packed_objects_incremental"` - PackedObjectsFull bool `json:"packed_objects_full"` - PrunedObjects bool `json:"pruned_objects"` - PackedRefs bool `json:"packed_refs"` - WrittenBitmap bool `json:"written_bitmap"` - }{} + optimizations := make(map[string]string) defer func() { totalTimer.ObserveDuration() ctxlogrus.Extract(ctx).WithField("optimizations", optimizations).Info("optimized repository") - for task, executed := range map[string]bool{ - "packed_objects_incremental": optimizations.PackedObjectsIncremental, - "packed_objects_full": optimizations.PackedObjectsFull, - "pruned_objects": optimizations.PrunedObjects, - "packed_refs": optimizations.PackedRefs, - "written_bitmap": optimizations.WrittenBitmap, - } { - if executed { - m.tasksTotal.WithLabelValues(task).Add(1) - } + for task, status := range optimizations { + m.tasksTotal.WithLabelValues(task, status).Inc() } - m.tasksTotal.WithLabelValues("total").Add(1) + m.tasksTotal.WithLabelValues("total", totalStatus).Add(1) }() timer := prometheus.NewTimer(m.tasksLatency.WithLabelValues("clean-stale-data")) @@ -83,30 +70,45 @@ func optimizeRepository(ctx context.Context, m *RepositoryManager, repo *localre timer = prometheus.NewTimer(m.tasksLatency.WithLabelValues("repack")) didRepack, repackCfg, err := repackIfNeeded(ctx, repo) if err != nil { + optimizations["packed_objects_full"] = "failure" + optimizations["packed_objects_incremental"] = "failure" + optimizations["written_bitmap"] = "failure" return fmt.Errorf("could not repack: %w", err) } if didRepack { - optimizations.PackedObjectsIncremental = !repackCfg.FullRepack - optimizations.PackedObjectsFull = repackCfg.FullRepack - optimizations.WrittenBitmap = repackCfg.WriteBitmap + if repackCfg.FullRepack { + optimizations["packed_objects_full"] = "success" + } else { + optimizations["packed_objects_incremental"] = "success" + } + if repackCfg.WriteBitmap { + optimizations["written_bitmap"] = "success" + } } + timer.ObserveDuration() timer = prometheus.NewTimer(m.tasksLatency.WithLabelValues("prune")) didPrune, err := pruneIfNeeded(ctx, repo) if err != nil { + optimizations["pruned_objects"] = "failure" return fmt.Errorf("could not prune: %w", err) + } else if didPrune { + optimizations["pruned_objects"] = "success" } - optimizations.PrunedObjects = didPrune timer.ObserveDuration() timer = prometheus.NewTimer(m.tasksLatency.WithLabelValues("pack-refs")) didPackRefs, err := packRefsIfNeeded(ctx, repo) if err != nil { + optimizations["packed_refs"] = "failure" return fmt.Errorf("could not pack refs: %w", err) + } else if didPackRefs { + optimizations["packed_refs"] = "success" } - optimizations.PackedRefs = didPackRefs + timer.ObserveDuration() + totalStatus = "success" return nil } diff --git a/internal/git/housekeeping/optimize_repository_ext_test.go b/internal/git/housekeeping/optimize_repository_ext_test.go index b494edba4..723408ec0 100644 --- a/internal/git/housekeeping/optimize_repository_ext_test.go +++ b/internal/git/housekeeping/optimize_repository_ext_test.go @@ -119,22 +119,16 @@ func TestPruneIfNeeded(t *testing.T) { ctx := ctxlogrus.ToContext(ctx, logrus.NewEntry(logger)) require.NoError(t, housekeeping.NewManager(cfg.Prometheus, nil).OptimizeRepository(ctx, repo)) - require.Equal(t, - struct { - PackedObjectsIncremental bool `json:"packed_objects_incremental"` - PackedObjectsFull bool `json:"packed_objects_full"` - PrunedObjects bool `json:"pruned_objects"` - PackedRefs bool `json:"packed_refs"` - WrittenBitmap bool `json:"written_bitmap"` - }{ - PackedObjectsIncremental: false, - PackedObjectsFull: true, - PrunedObjects: tc.expectedPrune, - PackedRefs: false, - WrittenBitmap: true, - }, - hook.Entries[len(hook.Entries)-1].Data["optimizations"], - ) + expectedLogEntry := map[string]string{ + "packed_objects_full": "success", + "written_bitmap": "success", + } + + if tc.expectedPrune { + expectedLogEntry["pruned_objects"] = "success" + } + + require.Equal(t, expectedLogEntry, hook.Entries[len(hook.Entries)-1].Data["optimizations"]) }) } } diff --git a/internal/git/housekeeping/optimize_repository_test.go b/internal/git/housekeeping/optimize_repository_test.go index e8bd6f0dd..086e3572e 100644 --- a/internal/git/housekeeping/optimize_repository_test.go +++ b/internal/git/housekeeping/optimize_repository_test.go @@ -1,6 +1,7 @@ package housekeeping import ( + "bytes" "context" "fmt" "io" @@ -503,10 +504,10 @@ func TestOptimizeRepository(t *testing.T) { txManager := transaction.NewManager(cfg, backchannel.NewRegistry()) for _, tc := range []struct { - desc string - setup func(t *testing.T) *gitalypb.Repository - expectedErr error - expectedOptimizations map[string]float64 + desc string + setup func(t *testing.T) *gitalypb.Repository + expectedErr error + expectedMetrics string }{ { desc: "empty repository tries to write bitmap", @@ -514,10 +515,12 @@ func TestOptimizeRepository(t *testing.T) { repo, _ := gittest.InitRepo(t, cfg, cfg.Storages[0]) return repo }, - expectedOptimizations: map[string]float64{ - "packed_objects_full": 1, - "written_bitmap": 1, - }, + expectedMetrics: `# HELP gitaly_housekeeping_tasks_total Total number of housekeeping tasks performed in the repository +# TYPE gitaly_housekeeping_tasks_total counter +gitaly_housekeeping_tasks_total{housekeeping_task="packed_objects_full", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="written_bitmap", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="total", status="success"} 1 +`, }, { desc: "repository without bitmap repacks objects", @@ -525,10 +528,12 @@ func TestOptimizeRepository(t *testing.T) { repo, _ := gittest.CloneRepo(t, cfg, cfg.Storages[0]) return repo }, - expectedOptimizations: map[string]float64{ - "packed_objects_full": 1, - "written_bitmap": 1, - }, + expectedMetrics: `# HELP gitaly_housekeeping_tasks_total Total number of housekeeping tasks performed in the repository +# TYPE gitaly_housekeeping_tasks_total counter +gitaly_housekeeping_tasks_total{housekeeping_task="packed_objects_full", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="written_bitmap", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="total", status="success"} 1 +`, }, { desc: "repository without commit-graph repacks objects", @@ -537,10 +542,12 @@ func TestOptimizeRepository(t *testing.T) { gittest.Exec(t, cfg, "-C", repoPath, "repack", "-A", "--write-bitmap-index") return repo }, - expectedOptimizations: map[string]float64{ - "packed_objects_full": 1, - "written_bitmap": 1, - }, + expectedMetrics: `# HELP gitaly_housekeeping_tasks_total Total number of housekeeping tasks performed in the repository +# TYPE gitaly_housekeeping_tasks_total counter +gitaly_housekeeping_tasks_total{housekeeping_task="packed_objects_full", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="written_bitmap", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="total", status="success"} 1 +`, }, { desc: "well-packed repository does not optimize", @@ -550,6 +557,10 @@ func TestOptimizeRepository(t *testing.T) { gittest.Exec(t, cfg, "-C", repoPath, "commit-graph", "write", "--split", "--changed-paths") return repo }, + expectedMetrics: `# HELP gitaly_housekeeping_tasks_total Total number of housekeeping tasks performed in the repository +# TYPE gitaly_housekeeping_tasks_total counter +gitaly_housekeeping_tasks_total{housekeeping_task="total", status="success"} 1 +`, }, { desc: "recent loose objects don't get pruned", @@ -575,9 +586,11 @@ func TestOptimizeRepository(t *testing.T) { return repo }, - expectedOptimizations: map[string]float64{ - "packed_objects_incremental": 1, - }, + expectedMetrics: `# HELP gitaly_housekeeping_tasks_total Total number of housekeeping tasks performed in the repository +# TYPE gitaly_housekeeping_tasks_total counter +gitaly_housekeeping_tasks_total{housekeeping_task="packed_objects_incremental", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="total", status="success"} 1 +`, }, { desc: "old loose objects get pruned", @@ -600,10 +613,12 @@ func TestOptimizeRepository(t *testing.T) { return repo }, - expectedOptimizations: map[string]float64{ - "packed_objects_incremental": 1, - "pruned_objects": 1, - }, + expectedMetrics: `# HELP gitaly_housekeeping_tasks_total Total number of housekeeping tasks performed in the repository +# TYPE gitaly_housekeeping_tasks_total counter +gitaly_housekeeping_tasks_total{housekeeping_task="packed_objects_incremental", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="pruned_objects",status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="total", status="success"} 1 +`, }, { desc: "loose refs get packed", @@ -619,9 +634,11 @@ func TestOptimizeRepository(t *testing.T) { return repo }, - expectedOptimizations: map[string]float64{ - "packed_refs": 1, - }, + expectedMetrics: `# HELP gitaly_housekeeping_tasks_total Total number of housekeeping tasks performed in the repository +# TYPE gitaly_housekeeping_tasks_total counter +gitaly_housekeeping_tasks_total{housekeeping_task="packed_refs", status="success"} 1 +gitaly_housekeeping_tasks_total{housekeeping_task="total", status="success"} 1 +`, }, } { t.Run(tc.desc, func(t *testing.T) { @@ -635,16 +652,11 @@ func TestOptimizeRepository(t *testing.T) { err := manager.OptimizeRepository(ctx, repo) require.Equal(t, tc.expectedErr, err) - for _, metric := range []string{ - "packed_objects_incremental", - "packed_objects_full", - "pruned_objects", - "packed_refs", - "written_bitmap", - } { - value := testutil.ToFloat64(manager.tasksTotal.WithLabelValues(metric)) - require.Equal(t, tc.expectedOptimizations[metric], value, metric) - } + assert.NoError(t, testutil.CollectAndCompare( + manager.tasksTotal, + bytes.NewBufferString(tc.expectedMetrics), + "gitaly_housekeeping_tasks_total", + )) }) } } diff --git a/internal/gitaly/maintenance/optimize.go b/internal/gitaly/maintenance/optimize.go index d7231fb24..647a597ad 100644 --- a/internal/gitaly/maintenance/optimize.go +++ b/internal/gitaly/maintenance/optimize.go @@ -3,30 +3,61 @@ package maintenance import ( "context" "errors" + "fmt" "math/rand" "os" "path/filepath" "time" - "github.com/prometheus/client_golang/prometheus" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" - "google.golang.org/grpc" ) -var repoOptimizationHistogram = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "gitaly_daily_maintenance_repo_optimization_seconds", - Help: "How many seconds each repo takes to successfully optimize during daily maintenance", - Buckets: []float64{0.01, 0.1, 1.0, 10.0, 100}, - }, -) +// WorkerFunc is a function that does a unit of work meant to run in the background +type WorkerFunc func(context.Context, logrus.FieldLogger) error + +// StartWorkers will start any background workers and returns a function that +// can be used to shut down the background workers. +func StartWorkers( + ctx context.Context, + l logrus.FieldLogger, + workers ...WorkerFunc, +) (func(), error) { + errQ := make(chan error) + + ctx, cancel := context.WithCancel(ctx) + + for _, worker := range workers { + worker := worker + go func() { + errQ <- worker(ctx, l) + }() + } + + shutdown := func() { + cancel() + + // give the worker 5 seconds to shutdown gracefully + timeout := 5 * time.Second + + var err error + select { + case err = <-errQ: + break + case <-time.After(timeout): + err = fmt.Errorf("timed out after %s", timeout) + } + if err != nil && err != context.Canceled { + l.WithError(err).Error("maintenance worker shutdown") + } + } -func init() { - prometheus.MustRegister(repoOptimizationHistogram) + return shutdown, nil } func shuffledStoragesCopy(randSrc *rand.Rand, storages []config.Storage) []config.Storage { @@ -38,7 +69,32 @@ func shuffledStoragesCopy(randSrc *rand.Rand, storages []config.Storage) []confi // Optimizer knows how to optimize a repository type Optimizer interface { - OptimizeRepository(context.Context, *gitalypb.OptimizeRepositoryRequest, ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error) + OptimizeRepository(context.Context, repository.GitRepo) error +} + +// OptimizerFunc is an adapter to allow the use of an ordinary function as an Optimizer +type OptimizerFunc func(context.Context, repository.GitRepo) error + +// OptimizeRepository calls o(ctx, repo) +func (o OptimizerFunc) OptimizeRepository(ctx context.Context, repo repository.GitRepo) error { + return o(ctx, repo) +} + +// DailyOptimizationWorker creates a worker that runs repository maintenance daily +func DailyOptimizationWorker(cfg config.Cfg, optimizer Optimizer) WorkerFunc { + return func(ctx context.Context, l logrus.FieldLogger) error { + return NewDailyWorker().StartDaily( + ctx, + l, + cfg.DailyMaintenance, + OptimizeReposRandomly( + cfg.Storages, + optimizer, + helper.NewTimerTicker(1*time.Second), + rand.New(rand.NewSource(time.Now().UnixNano())), + ), + ) + } } func optimizeRepoAtPath(ctx context.Context, l logrus.FieldLogger, s config.Storage, absPath string, o Optimizer) error { @@ -52,28 +108,26 @@ func optimizeRepoAtPath(ctx context.Context, l logrus.FieldLogger, s config.Stor RelativePath: relPath, } - optimizeReq := &gitalypb.OptimizeRepositoryRequest{ - Repository: repo, - } - - // In order to prevent RPC cancellation because of the parent context cancellation - // (job execution duration passed) we suppress cancellation of the parent and add a - // new time limit to make sure it won't block forever. - // It also helps us to be protected over repositories that are taking too long to complete - // a request, so no any other repository can be optimized. - ctx, cancel := context.WithTimeout(helper.SuppressCancellation(ctx), 5*time.Minute) + start := time.Now() + logEntry := l.WithFields(map[string]interface{}{ + "relative_path": relPath, + "storage": s.Name, + "source": "maintenance.daily", + "start_time": start.UTC(), + }) + + ctx, cancel := context.WithTimeout(ctxlogrus.ToContext(ctx, logEntry), 5*time.Minute) defer cancel() - start := time.Now() - if _, err := o.OptimizeRepository(ctx, optimizeReq); err != nil { - l.WithFields(map[string]interface{}{ - "relative_path": relPath, - "storage": s.Name, - }).WithError(err). - Errorf("maintenance: repo optimization failure") + err = o.OptimizeRepository(ctx, repo) + logEntry = logEntry.WithField("time_ms", time.Since(start).Milliseconds()) + + if err != nil { + logEntry.WithError(err).Errorf("maintenance: repo optimization failure") + return err } - repoOptimizationHistogram.Observe(time.Since(start).Seconds()) + logEntry.Info("maintenance: repo optimization succeeded") return nil } diff --git a/internal/gitaly/maintenance/optimize_test.go b/internal/gitaly/maintenance/optimize_test.go index 418c1eb18..4b7f1efd7 100644 --- a/internal/gitaly/maintenance/optimize_test.go +++ b/internal/gitaly/maintenance/optimize_test.go @@ -9,51 +9,36 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping" - "gitlab.com/gitlab-org/gitaly/v14/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" + repo "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" - "google.golang.org/grpc" ) type mockOptimizer struct { t testing.TB - actual []*gitalypb.Repository + actual []repo.GitRepo cfg config.Cfg } -func (mo *mockOptimizer) OptimizeRepository(ctx context.Context, req *gitalypb.OptimizeRepositoryRequest, _ ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error) { - mo.actual = append(mo.actual, req.Repository) +func (mo *mockOptimizer) OptimizeRepository(ctx context.Context, repository repo.GitRepo) error { + mo.actual = append(mo.actual, repository) l := config.NewLocator(mo.cfg) gitCmdFactory := gittest.NewCommandFactory(mo.t, mo.cfg) catfileCache := catfile.NewCache(mo.cfg) mo.t.Cleanup(catfileCache.Stop) - git2goExecutor := git2go.NewExecutor(mo.cfg, gitCmdFactory, l) txManager := transaction.NewManager(mo.cfg, backchannel.NewRegistry()) housekeepingManager := housekeeping.NewManager(mo.cfg.Prometheus, txManager) - connsPool := client.NewPool() - mo.t.Cleanup(func() { testhelper.MustClose(mo.t, connsPool) }) - - resp, err := repository.NewServer(mo.cfg, nil, l, - txManager, - gitCmdFactory, - catfileCache, - connsPool, - git2goExecutor, - housekeepingManager, - ).OptimizeRepository(ctx, req) - assert.NoError(mo.t, err) - return resp, err + return housekeepingManager.OptimizeRepository(ctx, localrepo.New(l, gitCmdFactory, catfileCache, repository)) } func TestOptimizeReposRandomly(t *testing.T) { @@ -131,14 +116,14 @@ type mockOptimizerCancel struct { startedAt time.Time } -func (m mockOptimizerCancel) OptimizeRepository(ctx context.Context, _ *gitalypb.OptimizeRepositoryRequest, _ ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error) { +func (m mockOptimizerCancel) OptimizeRepository(ctx context.Context, _ repo.GitRepo) error { timeline, ok := ctx.Deadline() if assert.True(m.t, ok) { assert.True(m.t, timeline.After(m.startedAt), m.startedAt) future := m.startedAt.Add(10 * time.Minute) assert.True(m.t, timeline.Before(future), future) } - return &gitalypb.OptimizeRepositoryResponse{}, nil + return nil } func TestOptimizeReposRandomly_cancellationOverride(t *testing.T) { diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 279ad9f70..defea0c3b 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -1,22 +1,13 @@ package server import ( - "context" - "fmt" - "math/rand" "sync" - "time" "github.com/sirupsen/logrus" - gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" - "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" - "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -49,59 +40,6 @@ func NewGitalyServerFactory( } } -// StartWorkers will start any auxiliary background workers that are allowed -// to fail without stopping the rest of the server. -func (s *GitalyServerFactory) StartWorkers(ctx context.Context, l logrus.FieldLogger, cfg config.Cfg) (func(), error) { - var opts []grpc.DialOption - if cfg.Auth.Token != "" { - opts = append(opts, grpc.WithPerRPCCredentials( - gitalyauth.RPCCredentialsV2(cfg.Auth.Token), - )) - } - - cc, err := client.Dial("unix:"+cfg.GitalyInternalSocketPath(), opts) - if err != nil { - return nil, err - } - - errQ := make(chan error) - - ctx, cancel := context.WithCancel(ctx) - go func() { - errQ <- maintenance.NewDailyWorker().StartDaily( - ctx, - l, - cfg.DailyMaintenance, - maintenance.OptimizeReposRandomly( - cfg.Storages, - gitalypb.NewRepositoryServiceClient(cc), - helper.NewTimerTicker(1*time.Second), - rand.New(rand.NewSource(time.Now().UnixNano())), - ), - ) - }() - - shutdown := func() { - cancel() - - // give the worker 5 seconds to shutdown gracefully - timeout := 5 * time.Second - - var err error - select { - case err = <-errQ: - break - case <-time.After(timeout): - err = fmt.Errorf("timed out after %s", timeout) - } - if err != nil && err != context.Canceled { - l.WithError(err).Error("maintenance worker shutdown") - } - } - - return shutdown, nil -} - // Stop immediately stops all servers created by the GitalyServerFactory. func (s *GitalyServerFactory) Stop() { for _, servers := range [][]*grpc.Server{ |