diff options
author | James Fargher <proglottis@gmail.com> | 2020-08-26 04:00:44 +0300 |
---|---|---|
committer | James Fargher <proglottis@gmail.com> | 2020-08-26 04:00:44 +0300 |
commit | 89a32f8fc45d7725cbf55f65952e6ac6be627ddd (patch) | |
tree | c82f53800b2d40049f7fd688eb7a991f3d1a0951 | |
parent | e4ff30e44b6ac21f33290bbe7a9cbbd42f98d4d1 (diff) | |
parent | 965cac68f3fd24e7a57b3927e5f88d8227bfad2b (diff) |
Merge branch 'po-nightly-maintenance' into 'master'
Daily maintenance scheduler
See merge request gitlab-org/gitaly!2423
-rw-r--r-- | changelogs/unreleased/po-nightly-maintenance.yml | 5 | ||||
-rw-r--r-- | cmd/gitaly/main.go | 9 | ||||
-rw-r--r-- | config.toml.example | 8 | ||||
-rw-r--r-- | internal/config/config.go | 36 | ||||
-rw-r--r-- | internal/config/config_test.go | 66 | ||||
-rw-r--r-- | internal/maintenance/daily.go | 75 | ||||
-rw-r--r-- | internal/maintenance/daily_test.go | 86 | ||||
-rw-r--r-- | internal/maintenance/optimize.go | 149 | ||||
-rw-r--r-- | internal/maintenance/optimize_test.go | 87 | ||||
-rw-r--r-- | internal/server/server_factory.go | 59 |
10 files changed, 580 insertions, 0 deletions
diff --git a/changelogs/unreleased/po-nightly-maintenance.yml b/changelogs/unreleased/po-nightly-maintenance.yml new file mode 100644 index 000000000..d60496e30 --- /dev/null +++ b/changelogs/unreleased/po-nightly-maintenance.yml @@ -0,0 +1,5 @@ +--- +title: Daily maintenance scheduler +merge_request: 2423 +author: +type: added diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index 51d58fadd..ddd7ebfa8 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "os" @@ -12,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/config/sentry" "gitlab.com/gitlab-org/gitaly/internal/git" + glog "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/service/hook" "gitlab.com/gitlab-org/gitaly/internal/storage" @@ -163,5 +165,12 @@ func run(b *bootstrap.Bootstrap) error { return fmt.Errorf("initialize gitaly-ruby: %v", err) } + ctx := context.Background() + shutdownWorkers, err := servers.StartWorkers(ctx, glog.Default(), config.Config) + if err != nil { + return fmt.Errorf("initialize auxiliary workers: %v", err) + } + defer shutdownWorkers() + return b.Wait(config.Config.GracefulRestartTimeout.Duration()) } diff --git a/config.toml.example b/config.toml.example index 1332f6f9d..c8d408320 100644 --- a/config.toml.example +++ b/config.toml.example @@ -106,3 +106,11 @@ self_signed_cert = false # [[concurrency]] # rpc = "/gitaly.RepositoryService/GarbageCollect" # max_per_repo = 1 + +# Daily maintenance designates time slots to run daily to optimize and maintain +# enabled storages. +# [daily_maintenance] +# start_hour = 23 +# start_minute = 30 +# duration = "45m" +# storages = ["default"] diff --git a/internal/config/config.go b/internal/config/config.go index c43d4b6a8..36d4abb3d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,6 +29,14 @@ var ( hooks []func(Cfg) error ) +// DailyJob enables a daily task to be scheduled for specific storages +type DailyJob struct { + Hour uint `toml:"start_hour"` + Minute uint `toml:"start_minute"` + Duration Duration `toml:"duration"` + Storages []string `toml:"storages"` +} + // Cfg is a container for all config derived from config.toml. type Cfg struct { SocketPath string `toml:"socket_path" split_words:"true"` @@ -49,6 +57,7 @@ type Cfg struct { Concurrency []Concurrency `toml:"concurrency"` GracefulRestartTimeout Duration `toml:"graceful_restart_timeout"` InternalSocketDir string `toml:"internal_socket_dir"` + DailyMaintenance DailyJob `toml:"daily_maintenance"` } // TLS configuration @@ -156,6 +165,7 @@ func Validate() error { validateBinDir(), validateInternalSocketDir(), validateHooks(), + validateMaintenance(), } { if err != nil { return err @@ -463,3 +473,29 @@ func trySocketCreation(dir string) error { return l.Close() } + +func validateMaintenance() error { + dm := Config.DailyMaintenance + + sNames := map[string]struct{}{} + for _, s := range Config.Storages { + sNames[s.Name] = struct{}{} + } + for _, sName := range dm.Storages { + if _, ok := sNames[sName]; !ok { + return fmt.Errorf("daily maintenance specified storage %q does not exist in configuration", sName) + } + } + + if dm.Hour > 23 { + return fmt.Errorf("daily maintenance specified hour '%d' outside range (0-23)", dm.Hour) + } + if dm.Minute > 59 { + return fmt.Errorf("daily maintenance specified minute '%d' outside range (0-59)", dm.Minute) + } + if dm.Duration.Duration() > 24*time.Hour { + return fmt.Errorf("daily maintenance specified duration %s must be less than 24 hours", dm.Duration.Duration()) + } + + return nil +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 383ffb61d..400f68f20 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -2,6 +2,7 @@ package config import ( "bytes" + "errors" "fmt" "io" "io/ioutil" @@ -766,3 +767,68 @@ func TestInternalSocketDir(t *testing.T) { require.NoError(t, trySocketCreation(socketDir)) } + +func TestLoadDailyMaintenance(t *testing.T) { + for _, tt := range []struct { + name string + rawCfg string + expect DailyJob + loadErr error + validateErr error + }{ + { + name: "success", + rawCfg: `[[storage]] +name = "default" +path = "/" + + [daily_maintenance] +start_hour = 11 +start_minute = 23 +duration = "45m" +storages = ["default"] +`, + expect: DailyJob{ + Hour: 11, + Minute: 23, + Duration: Duration(45 * time.Minute), + Storages: []string{"default"}, + }, + }, + { + rawCfg: `[daily_maintenance] + start_hour = 24`, + expect: DailyJob{ + Hour: 24, + }, + validateErr: errors.New("daily maintenance specified hour '24' outside range (0-23)"), + }, { + rawCfg: `[daily_maintenance] + start_hour = 60`, + expect: DailyJob{ + Hour: 60, + }, + validateErr: errors.New("daily maintenance specified hour '60' outside range (0-23)"), + }, + { + rawCfg: `[daily_maintenance] + duration = "meow"`, + expect: DailyJob{}, + loadErr: errors.New("load toml: time: invalid duration meow"), + }, { + rawCfg: `[daily_maintenance] + storages = ["default"]`, + expect: DailyJob{ + Storages: []string{"default"}, + }, + validateErr: errors.New(`daily maintenance specified storage "default" does not exist in configuration`), + }, + } { + t.Run(tt.name, func(t *testing.T) { + tmpFile := configFileReader(tt.rawCfg) + require.Equal(t, tt.loadErr, Load(tmpFile)) + require.Equal(t, tt.expect, Config.DailyMaintenance) + require.Equal(t, tt.validateErr, validateMaintenance()) + }) + } +} diff --git a/internal/maintenance/daily.go b/internal/maintenance/daily.go new file mode 100644 index 000000000..09423d6f2 --- /dev/null +++ b/internal/maintenance/daily.go @@ -0,0 +1,75 @@ +package maintenance + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/dontpanic" +) + +// StoragesJob runs a job on storages. The string slice param indicates which +// storages are currently enabled for the feature. +type StoragesJob func(context.Context, logrus.FieldLogger, []string) error + +// DailyWorker allows for a storage job to be executed on a daily schedule +type DailyWorker struct { + // clock allows the time telling to be overridden deterministically in unit tests + clock func() time.Time + // timer allows the timing of tasks to be overridden deterministically in unit tests + timer func(time.Duration) <-chan time.Time +} + +// NewDailyWorker returns an initialized daily worker +func NewDailyWorker() DailyWorker { + return DailyWorker{ + clock: time.Now, + timer: time.After, + } +} + +func (dw DailyWorker) nextTime(hour, minute int) time.Time { + n := dw.clock() + next := time.Date(n.Year(), n.Month(), n.Day(), hour, minute, 0, 0, n.Location()) + if next.Equal(n) || next.Before(n) { + next = next.AddDate(0, 0, 1) + } + return next +} + +// StartDaily will run the provided job every day at the specified time for the +// specified duration. Only the specified storages wil be worked on. +func (dw DailyWorker) StartDaily(ctx context.Context, l logrus.FieldLogger, schedule config.DailyJob, job StoragesJob) error { + if schedule.Duration == 0 || len(schedule.Storages) == 0 { + return nil + } + + for { + nt := dw.nextTime(int(schedule.Hour), int(schedule.Minute)) + l.WithField("scheduled", nt).Info("maintenance: daily scheduled") + + var start time.Time + + select { + case <-ctx.Done(): + return ctx.Err() + case start = <-dw.timer(nt.Sub(dw.clock())): + l.WithField("max_duration", schedule.Duration.Duration()). + Info("maintenance: daily starting") + } + + var jobErr error + dontpanic.Try(func() { + ctx, cancel := context.WithTimeout(ctx, schedule.Duration.Duration()) + defer cancel() + + jobErr = job(ctx, l, schedule.Storages) + }) + + l.WithError(jobErr). + WithField("max_duration", schedule.Duration.Duration()). + WithField("actual_duration", time.Since(start)). + Info("maintenance: daily completed") + } +} diff --git a/internal/maintenance/daily_test.go b/internal/maintenance/daily_test.go new file mode 100644 index 000000000..1f9d0303f --- /dev/null +++ b/internal/maintenance/daily_test.go @@ -0,0 +1,86 @@ +package maintenance + +import ( + "context" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestStartDaily(t *testing.T) { + dw := NewDailyWorker() + + clockQ := make(chan time.Time) + dw.clock = func() time.Time { + return <-clockQ + } + + timerQ := make(chan time.Time) + durationQ := make(chan time.Duration) + dw.timer = func(d time.Duration) <-chan time.Time { + durationQ <- d + return timerQ + } + + storagesQ := make(chan []string) + fn := func(_ context.Context, _ logrus.FieldLogger, s []string) error { + storagesQ <- s + return nil + } + + errQ := make(chan error) + s := config.DailyJob{ + Hour: 1, + Duration: config.Duration(time.Hour), + Storages: []string{"meow"}, + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { errQ <- dw.StartDaily(ctx, testhelper.DiscardTestEntry(t), s, fn) }() + + startTime := time.Date(1999, 3, 31, 0, 0, 0, 0, time.Local) + for _, tt := range []struct { + name string + now time.Time + expectDuration time.Duration + }{ + { + name: "next job in an hour", + now: startTime, + expectDuration: time.Hour, + }, + { + name: "next job tomorrow", + now: startTime.Add(time.Hour), + expectDuration: 24 * time.Hour, + }, + { + name: "next job tomorrow", + now: startTime.Add(25 * time.Hour), + expectDuration: 24 * time.Hour, + }, + { + name: "next job in less than 24 hours", + now: startTime.Add(25 * time.Hour).Add(time.Minute), + expectDuration: 24*time.Hour - time.Minute, + }, + } { + t.Run(tt.name, func(t *testing.T) { + clockQ <- tt.now // start time + clockQ <- tt.now // time used to compute timer + require.Equal(t, tt.expectDuration, <-durationQ) // wait time until job + timerQ <- tt.now.Add(tt.expectDuration) // trigger the job + require.Equal(t, s.Storages, <-storagesQ) // fn was invoked + }) + } + + // abort daily task + cancel() + clockQ <- startTime // mock artifact; this value doesn't matter + clockQ <- startTime // mock artifact; this value doesn't matter + <-durationQ // mock artifact; this value doesn't matter + require.Equal(t, context.Canceled, <-errQ) +} diff --git a/internal/maintenance/optimize.go b/internal/maintenance/optimize.go new file mode 100644 index 000000000..fc5cbe366 --- /dev/null +++ b/internal/maintenance/optimize.go @@ -0,0 +1,149 @@ +package maintenance + +import ( + "context" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/storage" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +var repoOptimizationHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "gitaly_daily_maintenance_repo_optimization_seconds", + Help: "How many seconds each repo takes to successfully optimize during daily maintenance", + Buckets: []float64{0.01, 0.1, 1.0, 10.0, 100}, + }, +) + +func init() { + prometheus.MustRegister(repoOptimizationHistogram) +} + +func shuffledStoragesCopy(randSrc *rand.Rand, storages []config.Storage) []config.Storage { + shuffled := make([]config.Storage, len(storages)) + copy(shuffled, storages) + randSrc.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + return shuffled +} + +func shuffleFileInfos(randSrc *rand.Rand, s []os.FileInfo) { + randSrc.Shuffle(len(s), func(i, j int) { s[i], s[j] = s[j], s[i] }) +} + +// Optimizer knows how to optimize a repository +type Optimizer interface { + OptimizeRepository(context.Context, *gitalypb.OptimizeRepositoryRequest, ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error) +} + +func optimizeRepoAtPath(ctx context.Context, l logrus.FieldLogger, s config.Storage, absPath string, o Optimizer) error { + relPath, err := filepath.Rel(s.Path, absPath) + if err != nil { + return err + } + + repo := &gitalypb.Repository{ + StorageName: s.Name, + RelativePath: relPath, + } + + optimizeReq := &gitalypb.OptimizeRepositoryRequest{ + Repository: repo, + } + + start := time.Now() + if _, err := o.OptimizeRepository(ctx, optimizeReq); err != nil { + l.WithFields(map[string]interface{}{ + "relative_path": relPath, + "storage": s.Name, + }).WithError(err). + Errorf("maintenance: repo optimization failure") + } + repoOptimizationHistogram.Observe(time.Since(start).Seconds()) + + return nil +} + +func walkReposShuffled(ctx context.Context, randSrc *rand.Rand, l logrus.FieldLogger, path string, s config.Storage, o Optimizer) error { + entries, err := ioutil.ReadDir(path) + switch { + case os.IsNotExist(err): + return nil // race condition: someone deleted it + case err != nil: + return err + } + + shuffleFileInfos(randSrc, entries) + + for _, e := range entries { + if err := ctx.Err(); err != nil { + return err + } + + if !e.IsDir() { + continue + } + + absPath := filepath.Join(path, e.Name()) + if !storage.IsGitDirectory(absPath) { + if err := walkReposShuffled(ctx, randSrc, l, absPath, s, o); err != nil { + return err + } + continue + } + + if err := optimizeRepoAtPath(ctx, l, s, absPath, o); err != nil { + return err + } + } + + return nil +} + +// OptimizeReposRandomly returns a function to walk through each storage and +// attempt to optimize any repos encountered. +// +// Only storage paths that map to an enabled storage name will be walked. +// Any storage paths shared by multiple storages will only be walked once. +// +// Any errors during the optimization will be logged. Any other errors will be +// returned and cause the walk to end prematurely. +func OptimizeReposRandomly(storages []config.Storage, optimizer Optimizer) StoragesJob { + return func(ctx context.Context, l logrus.FieldLogger, enabledStorageNames []string) error { + enabledNames := map[string]struct{}{} + for _, sName := range enabledStorageNames { + enabledNames[sName] = struct{}{} + } + + visitedPaths := map[string]bool{} + + randSrc := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, storage := range shuffledStoragesCopy(randSrc, storages) { + if _, ok := enabledNames[storage.Name]; !ok { + continue // storage not enabled + } + if visitedPaths[storage.Path] { + continue // already visited + } + visitedPaths[storage.Path] = true + + l.WithField("storage_path", storage.Path). + Info("maintenance: optimizing repos in storage") + + if err := walkReposShuffled(ctx, randSrc, l, storage.Path, storage, optimizer); err != nil { + l.WithError(err). + WithField("storage_path", storage.Path). + Errorf("maintenance: unable to completely walk storage") + } + } + return nil + } +} diff --git a/internal/maintenance/optimize_test.go b/internal/maintenance/optimize_test.go new file mode 100644 index 000000000..b367381a1 --- /dev/null +++ b/internal/maintenance/optimize_test.go @@ -0,0 +1,87 @@ +package maintenance + +import ( + "context" + "path/filepath" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/service/repository" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type mockOptimizer struct { + t testing.TB + actual []*gitalypb.Repository + storages []config.Storage +} + +func (mo *mockOptimizer) OptimizeRepository(ctx context.Context, req *gitalypb.OptimizeRepositoryRequest, _ ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error) { + mo.actual = append(mo.actual, req.Repository) + l := config.NewLocator(config.Cfg{Storages: mo.storages}) + resp, err := repository.NewServer(nil, l, "").OptimizeRepository(ctx, req) + assert.NoError(mo.t, err) + return resp, err +} + +func TestOptimizeReposRandomly(t *testing.T) { + oldStorages := config.Config.Storages + defer func() { config.Config.Storages = oldStorages }() + + storages := []config.Storage{} + + for i := 0; i < 3; i++ { + tempDir, cleanup := testhelper.TempDir(t) + defer cleanup() + + storages = append(storages, config.Storage{ + Name: strconv.Itoa(i), + Path: tempDir, + }) + + testhelper.MustRunCommand(t, nil, "git", "init", "--bare", filepath.Join(tempDir, "a")) + testhelper.MustRunCommand(t, nil, "git", "init", "--bare", filepath.Join(tempDir, "b")) + } + + config.Config.Storages = storages + + mo := &mockOptimizer{ + t: t, + storages: storages, + } + walker := OptimizeReposRandomly(storages, mo) + + ctx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), []string{"0", "1"})) + + expect := []*gitalypb.Repository{ + {RelativePath: "a", StorageName: storages[0].Name}, + {RelativePath: "a", StorageName: storages[1].Name}, + {RelativePath: "b", StorageName: storages[0].Name}, + {RelativePath: "b", StorageName: storages[1].Name}, + } + require.ElementsMatch(t, expect, mo.actual) + + // repeat storage paths should not impact repos visited + storages = append(storages, config.Storage{ + Name: "duplicate", + Path: storages[0].Path, + }) + + mo = &mockOptimizer{ + t: t, + storages: storages, + } + config.Config.Storages = storages + + walker = OptimizeReposRandomly(storages, mo) + require.NoError(t, walker(ctx, testhelper.DiscardTestEntry(t), []string{"0", "1", "duplicate"})) + require.Equal(t, len(expect), len(mo.actual)) +} diff --git a/internal/server/server_factory.go b/internal/server/server_factory.go index 36fbde37c..cb9c629af 100644 --- a/internal/server/server_factory.go +++ b/internal/server/server_factory.go @@ -1,12 +1,20 @@ package server import ( + "context" + "fmt" "net" "sync" + "time" + "github.com/sirupsen/logrus" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" + "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/maintenance" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/service/hook" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -29,6 +37,57 @@ func (s *GitalyServerFactory) StartRuby() error { return s.ruby.Start() } +// StartWorkers will start any auxiliary background workers that are allowed +// to fail without stopping the rest of the server. +func (s *GitalyServerFactory) StartWorkers(ctx context.Context, l logrus.FieldLogger, cfg config.Cfg) (func(), error) { + var opts []grpc.DialOption + if cfg.Auth.Token != "" { + opts = append(opts, grpc.WithPerRPCCredentials( + gitalyauth.RPCCredentialsV2(cfg.Auth.Token), + )) + } + + cc, err := client.Dial("unix://"+config.GitalyInternalSocketPath(), opts) + if err != nil { + return nil, err + } + + errQ := make(chan error) + + ctx, cancel := context.WithCancel(ctx) + go func() { + errQ <- maintenance.NewDailyWorker().StartDaily( + ctx, + l, + cfg.DailyMaintenance, + maintenance.OptimizeReposRandomly( + cfg.Storages, + gitalypb.NewRepositoryServiceClient(cc), + ), + ) + }() + + shutdown := func() { + cancel() + + // give the worker 5 seconds to shutdown gracefully + timeout := 5 * time.Second + + var err error + select { + case err = <-errQ: + break + case <-time.After(timeout): + err = fmt.Errorf("timed out after %s", timeout) + } + if err != nil && err != context.Canceled { + l.WithError(err).Error("maintenance worker shutdown") + } + } + + return shutdown, nil +} + // Stop stops all servers started by calling Serve and the gitaly-ruby server. func (s *GitalyServerFactory) Stop() { for _, srv := range s.all() { |