diff options
author | John Cai <jcai@gitlab.com> | 2022-03-18 22:49:10 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-03-25 15:37:13 +0300 |
commit | a3439fb3ecb50b4b92b28e919212e270a2f3ae99 (patch) | |
tree | 23533b2f690df9c58d3fe6d78501f04c2d55327f | |
parent | e9c85a7455f1934bbebf589db4e9600a45c8f1bb (diff) |
server: Pass in generic WorkerFunc into StartWorkers
StartWorkers() is meant to start backgorund workers. Currently however,
its only used for repository optimization. In the future, there may be
more workers that we would want to add to this method.
However, the current method makes that a bit difficult. If we have
worker specific logic in the function body, then anytime that logic
needs some other resource it will need to be passed in via function
arguments.
This is true even if we want to change the implementation of an existing
worker. In the next commit, we will change the implementation of
OptimizeRepository, which requires a catfile cache, and a locator. Since
StartWorkers() doesn't have access to these, they will need to be passed
in.
Instead, introduce a new type Worker, which is just a function that
takes a context and logger. This way, we decouple the implementation of
the worker from StartWorkers().
-rw-r--r-- | cmd/gitaly/main.go | 7 | ||||
-rw-r--r-- | internal/gitaly/maintenance/optimize.go | 34 | ||||
-rw-r--r-- | internal/gitaly/server/server_factory.go | 47 |
3 files changed, 55 insertions, 33 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index a0e231cb1..364efb76f 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -26,6 +26,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/linguist" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/rubyserver" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/server" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" @@ -315,7 +316,11 @@ func run(cfg config.Cfg) error { return fmt.Errorf("unable to start the bootstrap: %v", err) } - shutdownWorkers, err := gitalyServerFactory.StartWorkers(ctx, glog.Default(), cfg) + shutdownWorkers, err := gitalyServerFactory.StartWorkers( + ctx, + glog.Default(), + maintenance.DailyOptimizationWorker(cfg), + ) if err != nil { return fmt.Errorf("initialize auxiliary workers: %v", err) } diff --git a/internal/gitaly/maintenance/optimize.go b/internal/gitaly/maintenance/optimize.go index d7231fb24..d137784ca 100644 --- a/internal/gitaly/maintenance/optimize.go +++ b/internal/gitaly/maintenance/optimize.go @@ -10,7 +10,11 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/server" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -41,6 +45,36 @@ type Optimizer interface { OptimizeRepository(context.Context, *gitalypb.OptimizeRepositoryRequest, ...grpc.CallOption) (*gitalypb.OptimizeRepositoryResponse, error) } + +// DailyOptimizationWorker creates a worker that runs repository maintenance daily +func DailyOptimizationWorker(cfg config.Cfg) server.WorkerFunc { + return func(ctx context.Context, l logrus.FieldLogger) error { + var opts []grpc.DialOption + if cfg.Auth.Token != "" { + opts = append(opts, grpc.WithPerRPCCredentials( + gitalyauth.RPCCredentialsV2(cfg.Auth.Token), + )) + } + + cc, err := client.Dial("unix:"+cfg.GitalyInternalSocketPath(), opts) + if err != nil { + return err + } + + return NewDailyWorker().StartDaily( + ctx, + l, + cfg.DailyMaintenance, + OptimizeReposRandomly( + cfg.Storages, + gitalypb.NewRepositoryServiceClient(cc), + helper.NewTimerTicker(1*time.Second), + rand.New(rand.NewSource(time.Now().UnixNano())), + ), + ) + } +} + func optimizeRepoAtPath(ctx context.Context, l logrus.FieldLogger, s config.Storage, absPath string, o Optimizer) error { relPath, err := filepath.Rel(s.Path, absPath) if err != nil { diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index 279ad9f70..ff3405c86 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -3,20 +3,14 @@ package server import ( "context" "fmt" - "math/rand" "sync" "time" "github.com/sirupsen/logrus" - gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" - "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/internal/cache" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/maintenance" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/limithandler" - "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -49,37 +43,26 @@ func NewGitalyServerFactory( } } +// WorkerFunc is a function that does a unit of work meant to run in the background +type WorkerFunc func(context.Context, logrus.FieldLogger) error + // StartWorkers will start any auxiliary background workers that are allowed // to fail without stopping the rest of the server. -func (s *GitalyServerFactory) StartWorkers(ctx context.Context, l logrus.FieldLogger, cfg config.Cfg) (func(), error) { - var opts []grpc.DialOption - if cfg.Auth.Token != "" { - opts = append(opts, grpc.WithPerRPCCredentials( - gitalyauth.RPCCredentialsV2(cfg.Auth.Token), - )) - } - - cc, err := client.Dial("unix:"+cfg.GitalyInternalSocketPath(), opts) - if err != nil { - return nil, err - } - +func (s *GitalyServerFactory) StartWorkers( + ctx context.Context, + l logrus.FieldLogger, + workers ...WorkerFunc, +) (func(), error) { errQ := make(chan error) ctx, cancel := context.WithCancel(ctx) - go func() { - errQ <- maintenance.NewDailyWorker().StartDaily( - ctx, - l, - cfg.DailyMaintenance, - maintenance.OptimizeReposRandomly( - cfg.Storages, - gitalypb.NewRepositoryServiceClient(cc), - helper.NewTimerTicker(1*time.Second), - rand.New(rand.NewSource(time.Now().UnixNano())), - ), - ) - }() + + for _, worker := range workers { + worker := worker + go func() { + errQ <- worker(ctx, l) + }() + } shutdown := func() { cancel() |