diff options
author | James Liu <jliu@gitlab.com> | 2023-10-03 05:59:13 +0300 |
---|---|---|
committer | James Liu <jliu@gitlab.com> | 2023-10-03 05:59:13 +0300 |
commit | f9e6780bdf315bf44fb1d9acafb76abe0ae66b73 (patch) | |
tree | 8eda11529b47e174e3c683c786c14fe0c9601bf5 | |
parent | bf95af83bec749ea8ee080b2f36f1f862da33ee8 (diff) | |
parent | 9e0e0c444e6845662ad8ed182d7be27c6a188e89 (diff) |
Merge branch 'pks-log-replace-ctxlogrus-pt1' into 'master'
log: Replace ctxlogrus (pt.1)
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6429
Merged-by: James Liu <jliu@gitlab.com>
Approved-by: James Fargher <jfargher@gitlab.com>
Approved-by: James Liu <jliu@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
77 files changed, 266 insertions, 142 deletions
diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index e436c7d48..2a57189d8 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -410,6 +410,7 @@ func run(cfg config.Cfg, logger log.Logger) error { } setup.RegisterAll(srv, &service.Dependencies{ + Logger: logger, Cfg: cfg, GitalyHookManager: hookManager, TransactionManager: transactionManager, diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go index b6c67e616..59f56b7c5 100644 --- a/internal/cli/praefect/serve.go +++ b/internal/cli/praefect/serve.go @@ -299,7 +299,7 @@ func server( // before the router is ready with the health status of the nodes. <-healthManager.Updated() - elector := nodes.NewPerRepositoryElector(db) + elector := nodes.NewPerRepositoryElector(logger, db) primaryGetter = elector assignmentStore = datastore.NewAssignmentStore(db, conf.StorageNames()) @@ -368,6 +368,7 @@ func server( var ( // top level server dependencies coordinator = praefect.NewCoordinator( + logger, queue, rs, router, diff --git a/internal/cli/praefect/subcmd_accept_dataloss_test.go b/internal/cli/praefect/subcmd_accept_dataloss_test.go index 33b9c0ba5..d2d4ae2de 100644 --- a/internal/cli/praefect/subcmd_accept_dataloss_test.go +++ b/internal/cli/praefect/subcmd_accept_dataloss_test.go @@ -53,7 +53,7 @@ func TestAcceptDatalossSubcommand(t *testing.T) { require.NoError(t, rs.SetGeneration(ctx, 1, storage, repo, generation)) } - ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(conf, rs, nil, nil, nil))}) + ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(info.NewServer(conf, testhelper.NewLogger(t), rs, nil, nil, nil))}) defer clean() conf.SocketPath = ln.Addr().String() diff --git a/internal/cli/praefect/subcmd_dataloss_test.go b/internal/cli/praefect/subcmd_dataloss_test.go index 30a3c49de..0e2e76f76 100644 --- a/internal/cli/praefect/subcmd_dataloss_test.go +++ b/internal/cli/praefect/subcmd_dataloss_test.go @@ -72,7 +72,7 @@ func TestDatalossSubcommand(t *testing.T) { require.NoError(t, gs.SetGeneration(ctx, 2, "gitaly-3", "repository-2", 0)) ln, clean := listenAndServe(t, []svcRegistrar{ - registerPraefectInfoServer(info.NewServer(cfg, gs, nil, nil, nil)), + registerPraefectInfoServer(info.NewServer(cfg, testhelper.NewLogger(t), gs, nil, nil, nil)), }) defer clean() cfg.SocketPath = ln.Addr().String() diff --git a/internal/cli/praefect/subcmd_metadata_test.go b/internal/cli/praefect/subcmd_metadata_test.go index 9c8265e62..77dc107bc 100644 --- a/internal/cli/praefect/subcmd_metadata_test.go +++ b/internal/cli/praefect/subcmd_metadata_test.go @@ -104,7 +104,7 @@ func TestMetadataSubcommand(t *testing.T) { }) ln, clean := listenAndServe(t, []svcRegistrar{ - registerPraefectInfoServer(info.NewServer(config.Config{}, rs, nil, nil, nil)), + registerPraefectInfoServer(info.NewServer(config.Config{}, testhelper.NewLogger(t), rs, nil, nil, nil)), }) t.Cleanup(clean) diff --git a/internal/cli/praefect/subcmd_set_replication_factor_test.go b/internal/cli/praefect/subcmd_set_replication_factor_test.go index c1d33f8b6..d2f3615cf 100644 --- a/internal/cli/praefect/subcmd_set_replication_factor_test.go +++ b/internal/cli/praefect/subcmd_set_replication_factor_test.go @@ -96,7 +96,7 @@ func TestSetReplicationFactorSubcommand(t *testing.T) { ) ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer( - info.NewServer(config.Config{}, nil, store, nil, nil), + info.NewServer(config.Config{}, testhelper.NewLogger(t), nil, store, nil, nil), )}) defer clean() diff --git a/internal/cli/praefect/subcmd_verify_test.go b/internal/cli/praefect/subcmd_verify_test.go index 9746f4d05..a2f63a356 100644 --- a/internal/cli/praefect/subcmd_verify_test.go +++ b/internal/cli/praefect/subcmd_verify_test.go @@ -168,7 +168,7 @@ func TestVerifySubcommand(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(db, nil) ln, clean := listenAndServe(t, []svcRegistrar{ - registerPraefectInfoServer(info.NewServer(config.Config{}, rs, nil, nil, nil)), + registerPraefectInfoServer(info.NewServer(config.Config{}, testhelper.NewLogger(t), rs, nil, nil, nil)), }) defer clean() diff --git a/internal/git/housekeeping/optimize_repository.go b/internal/git/housekeeping/optimize_repository.go index 999ea6629..3bd9a7ff7 100644 --- a/internal/git/housekeeping/optimize_repository.go +++ b/internal/git/housekeeping/optimize_repository.go @@ -75,7 +75,7 @@ func (m *RepositoryManager) OptimizeRepository( if err != nil { return fmt.Errorf("deriving repository info: %w", err) } - m.reportRepositoryInfo(ctx, repositoryInfo) + m.reportRepositoryInfo(ctx, logger, repositoryInfo) var strategy OptimizationStrategy if cfg.StrategyConstructor == nil { @@ -87,8 +87,8 @@ func (m *RepositoryManager) OptimizeRepository( return m.optimizeFunc(ctx, m, logger, repo, strategy) } -func (m *RepositoryManager) reportRepositoryInfo(ctx context.Context, info stats.RepositoryInfo) { - info.Log(ctx) +func (m *RepositoryManager) reportRepositoryInfo(ctx context.Context, logger log.Logger, info stats.RepositoryInfo) { + info.Log(ctx, logger) m.reportDataStructureExistence("commit_graph", info.CommitGraph.Exists) m.reportDataStructureExistence("commit_graph_bloom_filters", info.CommitGraph.HasBloomFilters) diff --git a/internal/git/stats/repository_info.go b/internal/git/stats/repository_info.go index c507514c0..1f20987f9 100644 --- a/internal/git/stats/repository_info.go +++ b/internal/git/stats/repository_info.go @@ -117,12 +117,12 @@ func LooseObjects(repo *localrepo.Repo) (uint64, error) { // LogRepositoryInfo derives RepositoryInfo and calls its `Log()` function, if successful. Otherwise // it logs an error. -func LogRepositoryInfo(ctx context.Context, repo *localrepo.Repo) { +func LogRepositoryInfo(ctx context.Context, logger log.Logger, repo *localrepo.Repo) { repoInfo, err := RepositoryInfoForRepository(repo) if err != nil { - log.FromContext(ctx).WithError(err).Warn("failed reading repository info") + logger.WithError(err).WarnContext(ctx, "failed reading repository info") } else { - repoInfo.Log(ctx) + repoInfo.Log(ctx, logger) } } @@ -183,8 +183,8 @@ func RepositoryInfoForRepository(repo *localrepo.Repo) (RepositoryInfo, error) { } // Log logs the repository information as a structured entry under the `repository_info` field. -func (i RepositoryInfo) Log(ctx context.Context) { - log.FromContext(ctx).WithField("repository_info", i).Info("repository info") +func (i RepositoryInfo) Log(ctx context.Context, logger log.Logger) { + logger.WithField("repository_info", i).InfoContext(ctx, "repository info") } // ReferencesInfo contains information about references. diff --git a/internal/git/stats/repository_info_test.go b/internal/git/stats/repository_info_test.go index 2827b91bf..7a8bbbd35 100644 --- a/internal/git/stats/repository_info_test.go +++ b/internal/git/stats/repository_info_test.go @@ -102,7 +102,6 @@ func TestLogObjectInfo(t *testing.T) { logger := testhelper.NewLogger(t) hook := testhelper.AddLoggerHook(logger) - ctx := logger.ToContext(ctx) _, repoPath1 := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ SkipCreationViaService: true, @@ -122,7 +121,7 @@ func TestLogObjectInfo(t *testing.T) { alternatesStat, err := os.Stat(filepath.Join(targetRepoPath, "objects", "info", "alternates")) require.NoError(t, err) - LogRepositoryInfo(ctx, localrepo.NewTestRepo(t, cfg, &gitalypb.Repository{ + LogRepositoryInfo(ctx, logger, localrepo.NewTestRepo(t, cfg, &gitalypb.Repository{ StorageName: cfg.Storages[0].Name, RelativePath: targetRepoName, })) @@ -152,14 +151,13 @@ func TestLogObjectInfo(t *testing.T) { logger := testhelper.NewLogger(t) hook := testhelper.AddLoggerHook(logger) - ctx := logger.ToContext(ctx) repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ SkipCreationViaService: true, }) gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) - LogRepositoryInfo(ctx, localrepo.NewTestRepo(t, cfg, repo)) + LogRepositoryInfo(ctx, logger, localrepo.NewTestRepo(t, cfg, repo)) objectsInfo := requireRepositoryInfo(hook.AllEntries()) require.Equal(t, RepositoryInfo{ diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 61f289b73..2a1deb8ed 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -207,6 +207,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { require.NoError(t, err) setup.RegisterAll(srv, &service.Dependencies{ + Logger: logger, Cfg: cfg, GitalyHookManager: hookManager, TransactionManager: txManager, diff --git a/internal/gitaly/service/blob/server.go b/internal/gitaly/service/blob/server.go index c76bbd917..05b5b5ced 100644 --- a/internal/gitaly/service/blob/server.go +++ b/internal/gitaly/service/blob/server.go @@ -6,11 +6,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedBlobServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory catfileCache catfile.Cache @@ -19,6 +21,7 @@ type server struct { // NewServer creates a new instance of a grpc BlobServer func NewServer(deps *service.Dependencies) gitalypb.BlobServiceServer { return &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), catfileCache: deps.GetCatfileCache(), diff --git a/internal/gitaly/service/cleanup/server.go b/internal/gitaly/service/cleanup/server.go index b1b488696..bccf6700b 100644 --- a/internal/gitaly/service/cleanup/server.go +++ b/internal/gitaly/service/cleanup/server.go @@ -6,11 +6,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedCleanupServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory catfileCache catfile.Cache @@ -19,6 +21,7 @@ type server struct { // NewServer creates a new instance of a grpc CleanupServer func NewServer(deps *service.Dependencies) gitalypb.CleanupServiceServer { return &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), catfileCache: deps.GetCatfileCache(), diff --git a/internal/gitaly/service/commit/commits_helper.go b/internal/gitaly/service/commit/commits_helper.go index 50587337c..784d32907 100644 --- a/internal/gitaly/service/commit/commits_helper.go +++ b/internal/gitaly/service/commit/commits_helper.go @@ -6,7 +6,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" gitlog "gitlab.com/gitlab-org/gitaly/v16/internal/git/log" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -53,7 +52,7 @@ func (s *server) sendCommits( if err := cmd.Wait(); err != nil { // We expect this error to be caused by non-existing references. In that // case, we just log the error and send no commits to the `sender`. - log.FromContext(ctx).WithError(err).Info("ignoring git-log error") + s.logger.WithError(err).InfoContext(ctx, "ignoring git-log error") } return nil diff --git a/internal/gitaly/service/commit/count_commits.go b/internal/gitaly/service/commit/count_commits.go index 629a811ff..060699b97 100644 --- a/internal/gitaly/service/commit/count_commits.go +++ b/internal/gitaly/service/commit/count_commits.go @@ -10,7 +10,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -53,11 +52,11 @@ func (s *server) CountCommits(ctx context.Context, in *gitalypb.CountCommitsRequ var count int64 countStr, readAllErr := io.ReadAll(cmd) if readAllErr != nil { - log.FromContext(ctx).WithError(err).Info("ignoring git rev-list error") + s.logger.WithError(err).InfoContext(ctx, "ignoring git rev-list error") } if err := cmd.Wait(); err != nil { - log.FromContext(ctx).WithError(err).Info("ignoring git rev-list error") + s.logger.WithError(err).InfoContext(ctx, "ignoring git rev-list error") count = 0 } else if readAllErr == nil { var err error diff --git a/internal/gitaly/service/commit/get_tree_entries.go b/internal/gitaly/service/commit/get_tree_entries.go index 4abb2b4b9..df1748f40 100644 --- a/internal/gitaly/service/commit/get_tree_entries.go +++ b/internal/gitaly/service/commit/get_tree_entries.go @@ -328,10 +328,12 @@ func (c *treeEntriesSender) SetPaginationCursor(cursor string) { } func (s *server) GetTreeEntries(in *gitalypb.GetTreeEntriesRequest, stream gitalypb.CommitService_GetTreeEntriesServer) error { - log.FromContext(stream.Context()).WithFields(log.Fields{ + ctx := stream.Context() + + s.logger.WithFields(log.Fields{ "Revision": in.Revision, "Path": in.Path, - }).Debug("GetTreeEntries") + }).DebugContext(ctx, "GetTreeEntries") if err := validateGetTreeEntriesRequest(s.locator, in); err != nil { return err diff --git a/internal/gitaly/service/commit/isancestor.go b/internal/gitaly/service/commit/isancestor.go index 6dc489cc8..0efc4dd07 100644 --- a/internal/gitaly/service/commit/isancestor.go +++ b/internal/gitaly/service/commit/isancestor.go @@ -35,10 +35,10 @@ func (s *server) CommitIsAncestor(ctx context.Context, in *gitalypb.CommitIsAnce // Assumes that `path`, `ancestorID` and `childID` are populated :trollface: func (s *server) commitIsAncestorName(ctx context.Context, repo *gitalypb.Repository, ancestorID, childID string) (bool, error) { - log.FromContext(ctx).WithFields(log.Fields{ + s.logger.WithFields(log.Fields{ "ancestorSha": ancestorID, "childSha": childID, - }).Debug("commitIsAncestor") + }).DebugContext(ctx, "commitIsAncestor") cmd, err := s.gitCmdFactory.New(ctx, repo, git.Command{ Name: "merge-base", diff --git a/internal/gitaly/service/commit/list_files.go b/internal/gitaly/service/commit/list_files.go index 1e802e346..8379462f1 100644 --- a/internal/gitaly/service/commit/list_files.go +++ b/internal/gitaly/service/commit/list_files.go @@ -15,9 +15,11 @@ import ( ) func (s *server) ListFiles(in *gitalypb.ListFilesRequest, stream gitalypb.CommitService_ListFilesServer) error { - log.FromContext(stream.Context()).WithFields(log.Fields{ + ctx := stream.Context() + + s.logger.WithFields(log.Fields{ "Revision": in.GetRevision(), - }).Debug("ListFiles") + }).DebugContext(ctx, "ListFiles") if err := validateListFilesRequest(s.locator, in); err != nil { return structerr.NewInvalidArgument("%w", err) @@ -30,7 +32,7 @@ func (s *server) ListFiles(in *gitalypb.ListFilesRequest, stream gitalypb.Commit revision := string(in.GetRevision()) if len(revision) == 0 { - defaultBranch, err := repo.GetDefaultBranch(stream.Context()) + defaultBranch, err := repo.GetDefaultBranch(ctx) if err != nil { return structerr.NewNotFound("revision not found %q", revision) } @@ -42,7 +44,7 @@ func (s *server) ListFiles(in *gitalypb.ListFilesRequest, stream gitalypb.Commit revision = defaultBranch.String() } - contained, err := s.localrepo(repo).HasRevision(stream.Context(), git.Revision(revision)) + contained, err := s.localrepo(repo).HasRevision(ctx, git.Revision(revision)) if err != nil { return structerr.NewInternal("%w", err) } diff --git a/internal/gitaly/service/commit/server.go b/internal/gitaly/service/commit/server.go index a2465f537..d6d69e400 100644 --- a/internal/gitaly/service/commit/server.go +++ b/internal/gitaly/service/commit/server.go @@ -7,11 +7,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedCommitServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory catfileCache catfile.Cache @@ -21,6 +23,7 @@ type server struct { // NewServer creates a new instance of a grpc CommitServiceServer func NewServer(deps *service.Dependencies) gitalypb.CommitServiceServer { return &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), catfileCache: deps.GetCatfileCache(), diff --git a/internal/gitaly/service/conflicts/resolve_conflicts.go b/internal/gitaly/service/conflicts/resolve_conflicts.go index 48185e2ce..43a60ba7b 100644 --- a/internal/gitaly/service/conflicts/resolve_conflicts.go +++ b/internal/gitaly/service/conflicts/resolve_conflicts.go @@ -16,7 +16,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -37,10 +36,10 @@ func (s *server) ResolveConflicts(stream gitalypb.ConflictsService_ResolveConfli } err = s.resolveConflicts(header, stream) - return handleResolveConflictsErr(err, stream) + return s.handleResolveConflictsErr(err, stream) } -func handleResolveConflictsErr(err error, stream gitalypb.ConflictsService_ResolveConflictsServer) error { +func (s *server) handleResolveConflictsErr(err error, stream gitalypb.ConflictsService_ResolveConflictsServer) error { var errStr string // normalized error message if err != nil { errStr = strings.TrimPrefix(err.Error(), "resolve: ") // remove subcommand artifact @@ -56,9 +55,9 @@ func handleResolveConflictsErr(err error, stream gitalypb.ConflictsService_Resol // log the error since the interceptor won't catch this // error due to the unique way the RPC is defined to // handle resolution errors - log.FromContext(stream.Context()). + s.logger. WithError(err). - Error("ResolveConflicts: unable to resolve conflict") + ErrorContext(stream.Context(), "ResolveConflicts: unable to resolve conflict") return stream.SendAndClose(&gitalypb.ResolveConflictsResponse{ ResolutionError: errStr, }) diff --git a/internal/gitaly/service/conflicts/server.go b/internal/gitaly/service/conflicts/server.go index 1dfbdbf3f..17cfa52f8 100644 --- a/internal/gitaly/service/conflicts/server.go +++ b/internal/gitaly/service/conflicts/server.go @@ -12,12 +12,14 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedConflictsServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory catfileCache catfile.Cache @@ -29,6 +31,7 @@ type server struct { // NewServer creates a new instance of a grpc ConflictsServer func NewServer(deps *service.Dependencies) gitalypb.ConflictsServiceServer { return &server{ + logger: deps.GetLogger(), hookManager: deps.GetHookManager(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 923dcddbd..6e73950e8 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -18,11 +18,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" ) // Dependencies assembles set of components required by different kinds of services. type Dependencies struct { + Logger log.Logger Cfg config.Cfg GitalyHookManager gitalyhook.Manager TransactionManager transaction.Manager @@ -45,6 +47,11 @@ type Dependencies struct { BackupLocator backup.Locator } +// GetLogger returns the logger. +func (dc *Dependencies) GetLogger() log.Logger { + return dc.Logger +} + // GetCfg returns service configuration. func (dc *Dependencies) GetCfg() config.Cfg { return dc.Cfg diff --git a/internal/gitaly/service/diff/commit_delta.go b/internal/gitaly/service/diff/commit_delta.go index 18a4d25fc..da0a37103 100644 --- a/internal/gitaly/service/diff/commit_delta.go +++ b/internal/gitaly/service/diff/commit_delta.go @@ -13,11 +13,11 @@ import ( func (s *server) CommitDelta(in *gitalypb.CommitDeltaRequest, stream gitalypb.DiffService_CommitDeltaServer) error { ctx := stream.Context() - log.FromContext(ctx).WithFields(log.Fields{ + s.logger.WithFields(log.Fields{ "LeftCommitId": in.LeftCommitId, "RightCommitId": in.RightCommitId, "Paths": logPaths(in.Paths), - }).Debug("CommitDelta") + }).DebugContext(ctx, "CommitDelta") if err := validateRequest(s.locator, in); err != nil { return structerr.NewInvalidArgument("%w", err) diff --git a/internal/gitaly/service/diff/commit_diff.go b/internal/gitaly/service/diff/commit_diff.go index bd378906a..c9d97e7eb 100644 --- a/internal/gitaly/service/diff/commit_diff.go +++ b/internal/gitaly/service/diff/commit_diff.go @@ -13,11 +13,11 @@ import ( func (s *server) CommitDiff(in *gitalypb.CommitDiffRequest, stream gitalypb.DiffService_CommitDiffServer) error { ctx := stream.Context() - log.FromContext(ctx).WithFields(log.Fields{ + s.logger.WithFields(log.Fields{ "LeftCommitId": in.LeftCommitId, "RightCommitId": in.RightCommitId, "Paths": logPaths(in.Paths), - }).Debug("CommitDiff") + }).DebugContext(ctx, "CommitDiff") if err := validateRequest(s.locator, in); err != nil { return structerr.NewInvalidArgument("%w", err) diff --git a/internal/gitaly/service/diff/server.go b/internal/gitaly/service/diff/server.go index 156da0f56..39ad05449 100644 --- a/internal/gitaly/service/diff/server.go +++ b/internal/gitaly/service/diff/server.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -14,6 +15,7 @@ const msgSizeThreshold = 5 * 1024 type server struct { gitalypb.UnimplementedDiffServiceServer MsgSizeThreshold int + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory catfileCache catfile.Cache @@ -23,6 +25,7 @@ type server struct { func NewServer(deps *service.Dependencies) gitalypb.DiffServiceServer { return &server{ MsgSizeThreshold: msgSizeThreshold, + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), catfileCache: deps.GetCatfileCache(), diff --git a/internal/gitaly/service/hook/server.go b/internal/gitaly/service/hook/server.go index 3d2977616..c35be9501 100644 --- a/internal/gitaly/service/hook/server.go +++ b/internal/gitaly/service/hook/server.go @@ -9,12 +9,14 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/streamcache" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedHookServiceServer + logger log.Logger manager gitalyhook.Manager locator storage.Locator gitCmdFactory git.CommandFactory @@ -34,6 +36,7 @@ type server struct { // NewServer creates a new instance of a gRPC namespace server func NewServer(deps *service.Dependencies) gitalypb.HookServiceServer { srv := &server{ + logger: deps.GetLogger(), manager: deps.GetHookManager(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/internalgitaly/server.go b/internal/gitaly/service/internalgitaly/server.go index c0fa429b2..a275ae252 100644 --- a/internal/gitaly/service/internalgitaly/server.go +++ b/internal/gitaly/service/internalgitaly/server.go @@ -4,11 +4,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedInternalGitalyServer + logger log.Logger storages []config.Storage locator storage.Locator } @@ -16,6 +18,7 @@ type server struct { // NewServer return an instance of the Gitaly service. func NewServer(deps *service.Dependencies) gitalypb.InternalGitalyServer { return &server{ + logger: deps.GetLogger(), storages: deps.GetCfg().Storages, locator: deps.GetLocator(), } diff --git a/internal/gitaly/service/internalgitaly/walkrepos_test.go b/internal/gitaly/service/internalgitaly/walkrepos_test.go index 3335a7d48..448d29784 100644 --- a/internal/gitaly/service/internalgitaly/walkrepos_test.go +++ b/internal/gitaly/service/internalgitaly/walkrepos_test.go @@ -77,6 +77,7 @@ func TestWalkRepos(t *testing.T) { // the first repo 'a' is being streamed to the client. deleteOnce := sync.Once{} srv := NewServer(&service.Dependencies{ + Logger: testhelper.SharedLogger(t), Cfg: cfg, StorageLocator: config.NewLocator(cfg), }) diff --git a/internal/gitaly/service/namespace/server.go b/internal/gitaly/service/namespace/server.go index 1c2a2df68..3646891db 100644 --- a/internal/gitaly/service/namespace/server.go +++ b/internal/gitaly/service/namespace/server.go @@ -3,15 +3,20 @@ package namespace import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedNamespaceServiceServer + logger log.Logger locator storage.Locator } // NewServer creates a new instance of a gRPC namespace server func NewServer(deps *service.Dependencies) gitalypb.NamespaceServiceServer { - return &server{locator: deps.GetLocator()} + return &server{ + logger: deps.GetLogger(), + locator: deps.GetLocator(), + } } diff --git a/internal/gitaly/service/objectpool/fetch_into_object_pool.go b/internal/gitaly/service/objectpool/fetch_into_object_pool.go index 7cf69dbe4..1b279a90f 100644 --- a/internal/gitaly/service/objectpool/fetch_into_object_pool.go +++ b/internal/gitaly/service/objectpool/fetch_into_object_pool.go @@ -26,7 +26,7 @@ func (s *server) FetchIntoObjectPool(ctx context.Context, req *gitalypb.FetchInt return nil, structerr.NewInternal("%w", err) } - stats.LogRepositoryInfo(ctx, objectPool.Repo) + stats.LogRepositoryInfo(ctx, s.logger, objectPool.Repo) return &gitalypb.FetchIntoObjectPoolResponse{}, nil } diff --git a/internal/gitaly/service/objectpool/get.go b/internal/gitaly/service/objectpool/get.go index 158e09368..73bf2ae5d 100644 --- a/internal/gitaly/service/objectpool/get.go +++ b/internal/gitaly/service/objectpool/get.go @@ -4,7 +4,6 @@ import ( "context" "gitlab.com/gitlab-org/gitaly/v16/internal/git/objectpool" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -19,11 +18,11 @@ func (s *server) GetObjectPool(ctx context.Context, in *gitalypb.GetObjectPoolRe objectPool, err := objectpool.FromRepo(s.locator, s.gitCmdFactory, s.catfileCache, s.txManager, s.housekeepingManager, repo) if err != nil { - log.FromContext(ctx). + s.logger. WithError(err). WithField("storage", repository.GetStorageName()). WithField("relative_path", repository.GetRelativePath()). - Warn("alternates file does not point to valid git repository") + WarnContext(ctx, "alternates file does not point to valid git repository") } if objectPool == nil { diff --git a/internal/gitaly/service/objectpool/server.go b/internal/gitaly/service/objectpool/server.go index 3edb6916a..54a1a9cb2 100644 --- a/internal/gitaly/service/objectpool/server.go +++ b/internal/gitaly/service/objectpool/server.go @@ -9,11 +9,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedObjectPoolServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory catfileCache catfile.Cache @@ -25,6 +27,7 @@ type server struct { // NewServer creates a new instance of a gRPC repo server func NewServer(deps *service.Dependencies) gitalypb.ObjectPoolServiceServer { return &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), catfileCache: deps.GetCatfileCache(), diff --git a/internal/gitaly/service/operations/apply_patch.go b/internal/gitaly/service/operations/apply_patch.go index 22c20a7ef..366109b83 100644 --- a/internal/gitaly/service/operations/apply_patch.go +++ b/internal/gitaly/service/operations/apply_patch.go @@ -14,7 +14,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/text" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" @@ -113,7 +112,7 @@ func (s *Server) userApplyPatch(ctx context.Context, header *gitalypb.UserApplyP worktreeName := filepath.Base(worktreePath) if err := s.removeWorktree(ctx, header.Repository, worktreeName); err != nil { - log.FromContext(ctx).WithField("worktree_name", worktreeName).WithError(err).Error("failed to remove worktree") + s.logger.WithField("worktree_name", worktreeName).WithError(err).ErrorContext(ctx, "failed to remove worktree") } }() diff --git a/internal/gitaly/service/operations/merge_to_ref.go b/internal/gitaly/service/operations/merge_to_ref.go index b3d623650..925f97a40 100644 --- a/internal/gitaly/service/operations/merge_to_ref.go +++ b/internal/gitaly/service/operations/merge_to_ref.go @@ -102,13 +102,13 @@ func (s *Server) UserMergeToRef(ctx context.Context, request *gitalypb.UserMerge false, ) if err != nil { - log.FromContext(ctx).WithError(err).WithFields( + s.logger.WithError(err).WithFields( log.Fields{ "source_sha": sourceOID, "target_sha": oid, "target_ref": string(request.TargetRef), }, - ).Error("unable to create merge commit") + ).ErrorContext(ctx, "unable to create merge commit") return nil, structerr.NewFailedPrecondition("Failed to create merge commit for source_sha %s and target_sha %s at %s", sourceOID, oid, string(request.TargetRef)) diff --git a/internal/gitaly/service/operations/server.go b/internal/gitaly/service/operations/server.go index e926b9b45..21c97cd94 100644 --- a/internal/gitaly/service/operations/server.go +++ b/internal/gitaly/service/operations/server.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -20,6 +21,7 @@ import ( //nolint:revive // This is unintentionally missing documentation. type Server struct { gitalypb.UnimplementedOperationServiceServer + logger log.Logger hookManager hook.Manager txManager transaction.Manager locator storage.Locator @@ -33,6 +35,7 @@ type Server struct { // NewServer creates a new instance of a grpc OperationServiceServer func NewServer(deps *service.Dependencies) *Server { return &Server{ + logger: deps.GetLogger(), hookManager: deps.GetHookManager(), txManager: deps.GetTxManager(), locator: deps.GetLocator(), diff --git a/internal/gitaly/service/operations/submodules.go b/internal/gitaly/service/operations/submodules.go index 7aff450e2..144ef5cac 100644 --- a/internal/gitaly/service/operations/submodules.go +++ b/internal/gitaly/service/operations/submodules.go @@ -10,7 +10,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -88,9 +87,9 @@ func (s *Server) UserUpdateSubmodule(ctx context.Context, req *gitalypb.UserUpda resp = &gitalypb.UserUpdateSubmoduleResponse{ CommitError: legacyErrPrefixInvalidSubmodulePath, } - log.FromContext(ctx). + s.logger. WithError(err). - Error("UserUpdateSubmodule: git2go subcommand failure") + ErrorContext(ctx, "UserUpdateSubmodule: git2go subcommand failure") } if strings.Contains(errStr, "is already at") { resp = &gitalypb.UserUpdateSubmoduleResponse{ diff --git a/internal/gitaly/service/ref/server.go b/internal/gitaly/service/ref/server.go index e20d2cd62..ca8816d23 100644 --- a/internal/gitaly/service/ref/server.go +++ b/internal/gitaly/service/ref/server.go @@ -7,11 +7,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedRefServiceServer + logger log.Logger txManager transaction.Manager locator storage.Locator gitCmdFactory git.CommandFactory @@ -21,6 +23,7 @@ type server struct { // NewServer creates a new instance of a grpc RefServer func NewServer(deps *service.Dependencies) gitalypb.RefServiceServer { return &server{ + logger: deps.GetLogger(), txManager: deps.GetTxManager(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/remote/server.go b/internal/gitaly/service/remote/server.go index ba2552537..c2ee956e6 100644 --- a/internal/gitaly/service/remote/server.go +++ b/internal/gitaly/service/remote/server.go @@ -8,11 +8,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedRemoteServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory catfileCache catfile.Cache @@ -24,6 +26,7 @@ type server struct { // NewServer creates a new instance of a grpc RemoteServiceServer func NewServer(deps *service.Dependencies) gitalypb.RemoteServiceServer { return &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), catfileCache: deps.GetCatfileCache(), diff --git a/internal/gitaly/service/repository/apply_gitattributes.go b/internal/gitaly/service/repository/apply_gitattributes.go index e0095678e..f9dffccbf 100644 --- a/internal/gitaly/service/repository/apply_gitattributes.go +++ b/internal/gitaly/service/repository/apply_gitattributes.go @@ -13,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/txinfo" @@ -55,7 +54,7 @@ func (s *server) applyGitattributes(ctx context.Context, repo *localrepo.Repo, o } defer func() { if err := locker.Close(); err != nil { - log.FromContext(ctx).WithError(err).Error("unlocking gitattributes") + s.logger.WithError(err).ErrorContext(ctx, "unlocking gitattributes") } }() diff --git a/internal/gitaly/service/repository/archive.go b/internal/gitaly/service/repository/archive.go index 8992d0b42..ed77ee4ab 100644 --- a/internal/gitaly/service/repository/archive.go +++ b/internal/gitaly/service/repository/archive.go @@ -82,7 +82,7 @@ func (s *server) GetArchive(in *gitalypb.GetArchiveRequest, stream gitalypb.Repo return stream.Send(&gitalypb.GetArchiveResponse{Data: p}) }) - log.FromContext(ctx).WithField("request_hash", requestHash(in)).Info("request details") + s.logger.WithField("request_hash", requestHash(in)).InfoContext(ctx, "request details") return s.handleArchive(ctx, archiveParams{ writer: writer, diff --git a/internal/gitaly/service/repository/fetch.go b/internal/gitaly/service/repository/fetch.go index 143c67006..5e1efe40f 100644 --- a/internal/gitaly/service/repository/fetch.go +++ b/internal/gitaly/service/repository/fetch.go @@ -8,7 +8,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -91,9 +90,9 @@ func (s *server) FetchSourceBranch(ctx context.Context, req *gitalypb.FetchSourc ); err != nil { // Design quirk: if the fetch fails, this RPC returns Result: false, but no error. if errors.As(err, &localrepo.FetchFailedError{}) { - log.FromContext(ctx). + s.logger. WithField("oid", sourceOid.String()). - WithError(err).Warn("git fetch failed") + WithError(err).WarnContext(ctx, "git fetch failed") return &gitalypb.FetchSourceBranchResponse{Result: false}, nil } diff --git a/internal/gitaly/service/repository/prune_unreachable_objects.go b/internal/gitaly/service/repository/prune_unreachable_objects.go index 7f9d49768..9b730db2f 100644 --- a/internal/gitaly/service/repository/prune_unreachable_objects.go +++ b/internal/gitaly/service/repository/prune_unreachable_objects.go @@ -70,7 +70,7 @@ func (s *server) PruneUnreachableObjects( return nil, structerr.NewInternal("rewriting commit-graph: %w", err) } - stats.LogRepositoryInfo(ctx, repo) + stats.LogRepositoryInfo(ctx, s.logger, repo) return &gitalypb.PruneUnreachableObjectsResponse{}, nil } diff --git a/internal/gitaly/service/repository/rename.go b/internal/gitaly/service/repository/rename.go index 29c35085e..08cc1c8f1 100644 --- a/internal/gitaly/service/repository/rename.go +++ b/internal/gitaly/service/repository/rename.go @@ -9,7 +9,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -62,7 +61,7 @@ func (s *server) renameRepository(ctx context.Context, sourceRepo, targetRepo *g } defer func() { if err := sourceLocker.Close(); err != nil { - log.FromContext(ctx).WithError(err).Error("closing source repo locker failed") + s.logger.WithError(err).ErrorContext(ctx, "closing source repo locker failed") } }() @@ -72,7 +71,7 @@ func (s *server) renameRepository(ctx context.Context, sourceRepo, targetRepo *g } defer func() { if err := targetLocker.Close(); err != nil { - log.FromContext(ctx).WithError(err).Error("closing target repo locker failed") + s.logger.WithError(err).ErrorContext(ctx, "closing target repo locker failed") } }() diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index d7aaac44b..ad1fc573e 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -22,7 +22,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tempdir" @@ -151,7 +150,7 @@ func (s *server) create(ctx context.Context, in *gitalypb.ReplicateRepositoryReq return fmt.Errorf("error deleting invalid repo: %w", err) } - log.FromContext(ctx).WithField("repo_path", repoPath).Warn("removed invalid repository") + s.logger.WithField("repo_path", repoPath).WarnContext(ctx, "removed invalid repository") } if err := s.createFromSnapshot(ctx, in.GetSource(), in.GetRepository()); err != nil { diff --git a/internal/gitaly/service/repository/server.go b/internal/gitaly/service/repository/server.go index d87fc6b63..9b7ea0fea 100644 --- a/internal/gitaly/service/repository/server.go +++ b/internal/gitaly/service/repository/server.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/unarycache" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -22,6 +23,7 @@ import ( type server struct { gitalypb.UnimplementedRepositoryServiceServer + logger log.Logger conns *client.Pool locator storage.Locator txManager transaction.Manager @@ -40,6 +42,7 @@ type server struct { // NewServer creates a new instance of a gRPC repo server func NewServer(deps *service.Dependencies) gitalypb.RepositoryServiceServer { return &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), txManager: deps.GetTxManager(), gitCmdFactory: deps.GetGitCmdFactory(), diff --git a/internal/gitaly/service/repository/snapshot.go b/internal/gitaly/service/repository/snapshot.go index 1530013f4..e52c8eb90 100644 --- a/internal/gitaly/service/repository/snapshot.go +++ b/internal/gitaly/service/repository/snapshot.go @@ -9,7 +9,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/archive" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" @@ -98,7 +97,7 @@ func (s *server) addAlternateFiles(ctx context.Context, repository *gitalypb.Rep altObjDirs, err := git.AlternateObjectDirectories(ctx, storageRoot, repoPath) if err != nil { - log.FromContext(ctx).WithField("error", err).Warn("error getting alternate object directories") + s.logger.WithField("error", err).WarnContext(ctx, "error getting alternate object directories") return nil } diff --git a/internal/gitaly/service/server/disk_stats.go b/internal/gitaly/service/server/disk_stats.go index 2abcf251f..7efc0db37 100644 --- a/internal/gitaly/service/server/disk_stats.go +++ b/internal/gitaly/service/server/disk_stats.go @@ -3,7 +3,6 @@ package server import ( "context" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -12,7 +11,7 @@ func (s *server) DiskStatistics(ctx context.Context, _ *gitalypb.DiskStatisticsR for _, shard := range s.storages { shardInfo, err := getStorageStatus(shard) if err != nil { - log.FromContext(ctx).WithField("storage", shard).WithError(err).Error("to retrieve shard disk statistics") + s.logger.WithField("storage", shard).WithError(err).ErrorContext(ctx, "to retrieve shard disk statistics") results = append(results, &gitalypb.DiskStatisticsResponse_StorageStatus{StorageName: shard.Name}) continue } diff --git a/internal/gitaly/service/server/info.go b/internal/gitaly/service/server/info.go index f1580bc47..8be3a7725 100644 --- a/internal/gitaly/service/server/info.go +++ b/internal/gitaly/service/server/info.go @@ -8,7 +8,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/fstype" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/version" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -27,7 +26,7 @@ func (s *server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) gitalyMetadata, err := storage.ReadMetadataFile(shard.Path) if err != nil { - log.FromContext(ctx).WithField("storage", shard).WithError(err).Error("reading gitaly metadata file") + s.logger.WithField("storage", shard).WithError(err).ErrorContext(ctx, "reading gitaly metadata file") } storageStatuses = append(storageStatuses, &gitalypb.ServerInfoResponse_StorageStatus{ diff --git a/internal/gitaly/service/server/server.go b/internal/gitaly/service/server/server.go index 48b1a575c..8137612a6 100644 --- a/internal/gitaly/service/server/server.go +++ b/internal/gitaly/service/server/server.go @@ -4,11 +4,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedServerServiceServer + logger log.Logger gitCmdFactory git.CommandFactory storages []config.Storage } @@ -16,6 +18,7 @@ type server struct { // NewServer creates a new instance of a grpc ServerServiceServer func NewServer(deps *service.Dependencies) gitalypb.ServerServiceServer { return &server{ + logger: deps.GetLogger(), gitCmdFactory: deps.GetGitCmdFactory(), storages: deps.GetCfg().Storages, } diff --git a/internal/gitaly/service/smarthttp/inforefs.go b/internal/gitaly/service/smarthttp/inforefs.go index 0a9a50def..898d643a0 100644 --- a/internal/gitaly/service/smarthttp/inforefs.go +++ b/internal/gitaly/service/smarthttp/inforefs.go @@ -53,9 +53,9 @@ func (s *server) InfoRefsReceivePack(in *gitalypb.InfoRefsRequest, stream gitaly } func (s *server) handleInfoRefs(ctx context.Context, service, repoPath string, req *gitalypb.InfoRefsRequest, w io.Writer) error { - log.FromContext(ctx).WithFields(log.Fields{ + s.logger.WithFields(log.Fields{ "service": service, - }).Debug("handleInfoRefs") + }).DebugContext(ctx, "handleInfoRefs") cmdOpts := []git.CmdOpt{git.WithGitProtocol(req), git.WithStdout(w)} if service == "receive-pack" { diff --git a/internal/gitaly/service/smarthttp/receive_pack.go b/internal/gitaly/service/smarthttp/receive_pack.go index 73399bd45..68d1e3f09 100644 --- a/internal/gitaly/service/smarthttp/receive_pack.go +++ b/internal/gitaly/service/smarthttp/receive_pack.go @@ -20,12 +20,12 @@ func (s *server) PostReceivePack(stream gitalypb.SmartHTTPService_PostReceivePac return err } - log.FromContext(ctx).WithFields(log.Fields{ + s.logger.WithFields(log.Fields{ "GlID": req.GlId, "GlRepository": req.GlRepository, "GlUsername": req.GlUsername, "GitConfigOptions": req.GitConfigOptions, - }).Debug("PostReceivePack") + }).DebugContext(ctx, "PostReceivePack") if err := validateReceivePackRequest(s.locator, req); err != nil { return err diff --git a/internal/gitaly/service/smarthttp/server.go b/internal/gitaly/service/smarthttp/server.go index c58167abb..228303d16 100644 --- a/internal/gitaly/service/smarthttp/server.go +++ b/internal/gitaly/service/smarthttp/server.go @@ -6,11 +6,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type server struct { gitalypb.UnimplementedSmartHTTPServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory packfileNegotiationMetrics *prometheus.CounterVec @@ -21,6 +23,7 @@ type server struct { // NewServer creates a new instance of a grpc SmartHTTPServer func NewServer(deps *service.Dependencies, serverOpts ...ServerOpt) gitalypb.SmartHTTPServiceServer { s := &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), txManager: deps.GetTxManager(), diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index aaca1417f..f9548f2ad 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -10,7 +10,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -64,7 +63,7 @@ func (s *server) runStatsCollector(ctx context.Context, r io.Reader) (io.Reader, stats, err := stats.ParsePackfileNegotiation(pr) if err != nil { - log.FromContext(ctx).WithError(err).Debug("failed parsing packfile negotiation") + s.logger.WithError(err).DebugContext(ctx, "failed parsing packfile negotiation") return } stats.UpdateMetrics(s.packfileNegotiationMetrics) @@ -141,6 +140,6 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack return nil, structerr.NewFailedPrecondition("waiting for upload-pack: %w", err) } - log.FromContext(ctx).WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).Info("request details") + s.logger.WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).InfoContext(ctx, "request details") return nil, nil } diff --git a/internal/gitaly/service/ssh/receive_pack.go b/internal/gitaly/service/ssh/receive_pack.go index 189ba6ee8..4d30d9956 100644 --- a/internal/gitaly/service/ssh/receive_pack.go +++ b/internal/gitaly/service/ssh/receive_pack.go @@ -25,13 +25,13 @@ func (s *server) SSHReceivePack(stream gitalypb.SSHService_SSHReceivePackServer) return structerr.NewInternal("%w", err) } - log.FromContext(stream.Context()).WithFields(log.Fields{ + s.logger.WithFields(log.Fields{ "GlID": req.GlId, "GlRepository": req.GlRepository, "GlUsername": req.GlUsername, "GitConfigOptions": req.GitConfigOptions, "GitProtocol": req.GitProtocol, - }).Debug("SSHReceivePack") + }).DebugContext(stream.Context(), "SSHReceivePack") if err = validateFirstReceivePackRequest(s.locator, req); err != nil { return structerr.NewInvalidArgument("%w", err) @@ -127,7 +127,7 @@ func (s *server) sshReceivePack(stream gitalypb.SSHService_SSHReceivePackServer, if errSend := stream.Send(&gitalypb.SSHReceivePackResponse{ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, }); errSend != nil { - log.FromContext(ctx).WithError(errSend).Error("send final status code") + s.logger.WithError(errSend).ErrorContext(ctx, "send final status code") } // Detect the case where the user has cancelled the push and log it with a proper diff --git a/internal/gitaly/service/ssh/server.go b/internal/gitaly/service/ssh/server.go index c0dd3513a..07ac0f975 100644 --- a/internal/gitaly/service/ssh/server.go +++ b/internal/gitaly/service/ssh/server.go @@ -9,6 +9,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -19,6 +20,7 @@ var ( type server struct { gitalypb.UnimplementedSSHServiceServer + logger log.Logger locator storage.Locator gitCmdFactory git.CommandFactory txManager transaction.Manager @@ -30,6 +32,7 @@ type server struct { // NewServer creates a new instance of a grpc SSHServer func NewServer(deps *service.Dependencies, serverOpts ...ServerOpt) gitalypb.SSHServiceServer { s := &server{ + logger: deps.GetLogger(), locator: deps.GetLocator(), gitCmdFactory: deps.GetGitCmdFactory(), txManager: deps.GetTxManager(), diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go index d4d306533..5f8a51cab 100644 --- a/internal/gitaly/service/ssh/upload_pack.go +++ b/internal/gitaly/service/ssh/upload_pack.go @@ -28,11 +28,11 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e return structerr.NewInternal("%w", err) } - log.FromContext(ctx).WithFields(log.Fields{ + s.logger.WithFields(log.Fields{ "GlRepository": req.GetRepository().GetGlRepository(), "GitConfigOptions": req.GitConfigOptions, "GitProtocol": req.GitProtocol, - }).Debug("SSHUploadPack") + }).DebugContext(ctx, "SSHUploadPack") if err = validateFirstUploadPackRequest(s.locator, req); err != nil { return structerr.NewInvalidArgument("%w", err) @@ -57,7 +57,7 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e if errSend := stream.Send(&gitalypb.SSHUploadPackResponse{ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, }); errSend != nil { - log.FromContext(ctx).WithError(errSend).Error("send final status code") + s.logger.WithError(errSend).ErrorContext(ctx, "send final status code") } return structerr.NewInternal("%w", err) @@ -105,7 +105,7 @@ func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, st stats, errIgnore := stats.ParsePackfileNegotiation(pr) negotiation = &stats if errIgnore != nil { - log.FromContext(ctx).WithError(errIgnore).Debug("failed parsing packfile negotiation") + s.logger.WithError(errIgnore).DebugContext(ctx, "failed parsing packfile negotiation") return } stats.UpdateMetrics(s.packfileNegotiationMetrics) diff --git a/internal/grpc/middleware/customfieldshandler/customfields_handler_test.go b/internal/grpc/middleware/customfieldshandler/customfields_handler_test.go index 15895b7ba..4452e55d8 100644 --- a/internal/grpc/middleware/customfieldshandler/customfields_handler_test.go +++ b/internal/grpc/middleware/customfieldshandler/customfields_handler_test.go @@ -27,18 +27,16 @@ import ( func createNewServer(t *testing.T, cfg config.Cfg, logger log.Logger) *grpc.Server { t.Helper() - logrusEntry := logger.WithField("test", t.Name()) - opts := []grpc.ServerOption{ grpc.ChainStreamInterceptor( StreamInterceptor, - logrusEntry.StreamServerInterceptor( + logger.StreamServerInterceptor( grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat), grpcmwlogrus.WithMessageProducer(log.MessageProducer(grpcmwlogrus.DefaultMessageProducer, FieldsProducer))), ), grpc.ChainUnaryInterceptor( UnaryInterceptor, - logrusEntry.UnaryServerInterceptor( + logger.UnaryServerInterceptor( grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat), grpcmwlogrus.WithMessageProducer(log.MessageProducer(grpcmwlogrus.DefaultMessageProducer, FieldsProducer))), ), @@ -51,6 +49,7 @@ func createNewServer(t *testing.T, cfg config.Cfg, logger log.Logger) *grpc.Serv t.Cleanup(catfileCache.Stop) gitalypb.RegisterRefServiceServer(server, ref.NewServer(&service.Dependencies{ + Logger: logger, StorageLocator: config.NewLocator(cfg), GitCmdFactory: gitCommandFactory, TransactionManager: transaction.NewManager(cfg, backchannel.NewRegistry()), diff --git a/internal/log/logger.go b/internal/log/logger.go index 753ac3c30..e0e855f63 100644 --- a/internal/log/logger.go +++ b/internal/log/logger.go @@ -23,6 +23,11 @@ type Logger interface { Warn(msg string) Error(msg string) + DebugContext(ctx context.Context, msg string) + InfoContext(ctx context.Context, msg string) + WarnContext(ctx context.Context, msg string) + ErrorContext(ctx context.Context, msg string) + StreamServerInterceptor(...grpcmwlogrus.Option) grpc.StreamServerInterceptor UnaryServerInterceptor(...grpcmwlogrus.Option) grpc.UnaryServerInterceptor } @@ -96,6 +101,30 @@ func (l LogrusLogger) UnaryServerInterceptor(opts ...grpcmwlogrus.Option) grpc.U return grpcmwlogrus.UnaryServerInterceptor(l.entry, opts...) } +func (l LogrusLogger) log(ctx context.Context, level logrus.Level, msg string) { + l.entry.WithFields(ctxlogrus.Extract(ctx).Data).Log(level, msg) +} + +// DebugContext logs a new log message at Debug level. Fields added to the context via AddFields will be appended. +func (l LogrusLogger) DebugContext(ctx context.Context, msg string) { + l.log(ctx, logrus.DebugLevel, msg) +} + +// InfoContext logs a new log message at Info level. Fields added to the context via AddFields will be appended. +func (l LogrusLogger) InfoContext(ctx context.Context, msg string) { + l.log(ctx, logrus.InfoLevel, msg) +} + +// WarnContext logs a new log message at Warn level. Fields added to the context via AddFields will be appended. +func (l LogrusLogger) WarnContext(ctx context.Context, msg string) { + l.log(ctx, logrus.WarnLevel, msg) +} + +// ErrorContext level. Fields added to the context via AddFields will be appended. +func (l LogrusLogger) ErrorContext(ctx context.Context, msg string) { + l.log(ctx, logrus.ErrorLevel, msg) +} + // FromContext extracts the logger from the context. If no logger has been injected then this will return a discarding // logger. func FromContext(ctx context.Context) LogrusLogger { diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 073de8b4f..b01529082 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -157,7 +157,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, txMgr := transactions.NewManager(conf) - coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator(logger, queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered) srv := NewGRPCServer(&Dependencies{ Config: conf, diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index e2a8875e4..76d0b15f2 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -186,6 +186,7 @@ type grpcCall struct { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { + logger log.Logger router Router txMgr *transactions.Manager queue datastore.ReplicationEventQueue @@ -198,6 +199,7 @@ type Coordinator struct { // NewCoordinator returns a new Coordinator that utilizes the provided logger func NewCoordinator( + logger log.Logger, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, router Router, @@ -213,6 +215,7 @@ func NewCoordinator( } coordinator := &Coordinator{ + logger: logger, queue: queue, rs: rs, registry: r, @@ -490,15 +493,15 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall defer nodeErrors.Unlock() nodeErrors.errByNode[secondary.Storage] = err - log.FromContext(ctx).WithError(err). - Error("proxying to secondary failed") + c.logger.WithError(err). + ErrorContext(ctx, "proxying to secondary failed") // Cancels failed node's voter in its current subtransaction. // Also updates internal state of subtransaction to fail and // release blocked voters if quorum becomes impossible. if err := c.txMgr.CancelTransactionNodeVoter(transaction.ID(), secondary.Storage); err != nil { - log.FromContext(ctx).WithError(err). - Error("canceling secondary voter failed") + c.logger.WithError(err). + ErrorContext(ctx, "canceling secondary voter failed") } // The error is ignored, so we do not abort transactions @@ -543,9 +546,9 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall continue } - log.FromContext(ctx). + c.logger. WithError(err). - Error("coordinator proxy stream finalizer failure") + ErrorContext(ctx, "coordinator proxy stream finalizer failure") } return firstErr } @@ -588,7 +591,7 @@ func (c *Coordinator) maintenanceStreamParameters(ctx context.Context, call grpc defer nodeErrors.Unlock() nodeErrors.errByNode[node.Storage] = err - log.FromContext(ctx).WithField("gitaly_storage", node.Storage).WithError(err).Error("proxying maintenance RPC to node failed") + c.logger.WithField("gitaly_storage", node.Storage).WithError(err).ErrorContext(ctx, "proxying maintenance RPC to node failed") // We ignore any errors returned by nodes such that they all have a // chance to finish their maintenance RPC in a best-effort strategy. @@ -665,7 +668,7 @@ func streamParametersContext(ctx context.Context) context.Context { func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. - log.FromContext(ctx).WithField("method", fullMethodName).Debug("Stream director received method") + c.logger.WithField("method", fullMethodName).DebugContext(ctx, "Stream director received method") mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { @@ -887,7 +890,7 @@ func (c *Coordinator) createTransactionFinalizer( ) func() error { return func() error { primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries( - ctx, route, transaction, nodeErrors, c.txReplicationCountMetric) + ctx, c.logger, route, transaction, nodeErrors, c.txReplicationCountMetric) if !primaryDirtied { // If the primary replica was not modified then we don't need to consider the secondaries // outdated. Praefect requires the primary to be always part of the quorum, so no changes @@ -921,6 +924,7 @@ func (c *Coordinator) createTransactionFinalizer( // replication jobs to repair state. func getUpdatedAndOutdatedSecondaries( ctx context.Context, + logger log.Logger, route RepositoryMutatorRoute, transaction transactions.Transaction, nodeErrors *nodeErrors, @@ -950,10 +954,10 @@ func getUpdatedAndOutdatedSecondaries( nodesByState := make(map[string][]string) defer func() { - log.FromContext(ctx). + logger. WithField("transaction.primary", route.Primary.Storage). WithField("transaction.secondaries", nodesByState). - Info("transactional node states") + InfoContext(ctx, "transactional node states") for reason, nodes := range nodesByState { replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes))) @@ -1069,7 +1073,7 @@ func (c *Coordinator) newRequestFinalizer( ctx, cancel := context.WithTimeout(helper.SuppressCancellation(originalCtx), 30*time.Second) defer cancel() - logEntry := log.FromContext(ctx).WithFields(log.Fields{ + logEntry := c.logger.WithFields(log.Fields{ "replication.cause": cause, "replication.change": change, "replication.primary": primary, @@ -1080,7 +1084,7 @@ func (c *Coordinator) newRequestFinalizer( if len(outdatedSecondaries) > 0 { logEntry = logEntry.WithField("replication.outdated", outdatedSecondaries) } - logEntry.Info("queueing replication jobs") + logEntry.InfoContext(ctx, "queueing replication jobs") switch change { case datastore.UpdateRepo: @@ -1099,7 +1103,7 @@ func (c *Coordinator) newRequestFinalizer( return fmt.Errorf("rename repository: %w", err) } - log.FromContext(ctx).WithError(err).Info("renamed repository does not have a store entry") + c.logger.WithError(err).InfoContext(ctx, "renamed repository does not have a store entry") } case datastore.CreateRepo: repositorySpecificPrimariesEnabled := c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository @@ -1146,7 +1150,7 @@ func (c *Coordinator) newRequestFinalizer( g.Go(func() error { if _, err := c.queue.Enqueue(ctx, event); err != nil { if errors.As(err, &datastore.ReplicationEventExistsError{}) { - log.FromContext(ctx).WithError(err).Info("replication event queue already has similar entry") + c.logger.WithError(err).InfoContext(ctx, "replication event queue already has similar entry") return nil } diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 52d0d0ef5..9cf2baa23 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -175,6 +175,8 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { }, } + logger := testhelper.NewLogger(t) + var replicationWaitGroup sync.WaitGroup queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { @@ -220,17 +222,18 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { nil, nil, nil, - testhelper.SharedLogger(t), + logger, ) require.NoError(t, err) defer nodeSet.Close() coordinator := NewCoordinator( + logger, queueInterceptor, rs, NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(logger, tx), StaticHealthChecker(conf.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 8d5254bdf..e0eafe0c5 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -94,6 +94,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { } coordinator := NewCoordinator( + testhelper.NewLogger(t), datastore.NewPostgresReplicationEventQueue(db), rs, NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) { @@ -370,6 +371,7 @@ func TestStreamDirectorMutator(t *testing.T) { defer tx.Rollback(t) rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) + logger := testhelper.NewLogger(t) testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) @@ -379,11 +381,12 @@ func TestStreamDirectorMutator(t *testing.T) { }) coordinator := NewCoordinator( + logger, queueInterceptor, rs, NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(logger, tx), StaticHealthChecker(conf.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, @@ -486,6 +489,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( + testhelper.NewLogger(t), datastore.NewPostgresReplicationEventQueue(testdb.New(t)), rs, NewNodeManagerRouter(nodeMgr, rs), @@ -598,6 +602,7 @@ func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) { txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( + testhelper.NewLogger(t), datastore.NewPostgresReplicationEventQueue(testdb.New(t)), rs, NewNodeManagerRouter(nodeMgr, rs), @@ -701,6 +706,7 @@ func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) { } coordinator := NewCoordinator( + testhelper.NewLogger(t), &datastore.MockReplicationEventQueue{}, rs, router, @@ -759,6 +765,7 @@ func TestStreamDirector_maintenance(t *testing.T) { } db := testdb.New(t) + logger := testhelper.NewLogger(t) repo := gitalypb.Repository{ StorageName: "praefect", @@ -767,7 +774,7 @@ func TestStreamDirector_maintenance(t *testing.T) { ctx := testhelper.Context(t) - nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil, testhelper.SharedLogger(t)) + nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil, logger) require.NoError(t, err) defer nodeSet.Close() @@ -783,11 +790,12 @@ func TestStreamDirector_maintenance(t *testing.T) { queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) coordinator := NewCoordinator( + logger, queueInterceptor, rs, NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(logger, tx), StaticHealthChecker(cfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, @@ -1090,6 +1098,7 @@ func TestStreamDirectorAccessor(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { coordinator := NewCoordinator( + testhelper.NewLogger(t), queue, rs, tc.router, @@ -1180,6 +1189,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( + testhelper.NewLogger(t), queue, repoStore, NewNodeManagerRouter(nodeMgr, repoStore), @@ -1560,6 +1570,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) coordinator := NewCoordinator( + testhelper.NewLogger(t), queueInterceptor, repositoryStore, router, @@ -1721,6 +1732,7 @@ func TestAbsentCorrelationID(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), queueInterceptor, rs, NewNodeManagerRouter(nodeMgr, rs), @@ -1850,6 +1862,7 @@ func TestStreamDirectorStorageScope(t *testing.T) { nodeMgr.Start(0, time.Second) defer nodeMgr.Stop() coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(nodeMgr, rs), @@ -1913,6 +1926,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -1942,6 +1956,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -1972,6 +1987,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -2003,6 +2019,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { } rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -2820,7 +2837,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { Name: "stub", Help: "help", }, []string{"reason"}) - primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, testhelper.NewLogger(t), route, transaction, nodeErrors, metric) require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied) require.ElementsMatch(t, tc.expectedUpdated, updated) require.ElementsMatch(t, tc.expectedOutdated, outdated) @@ -2889,6 +2906,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { require.EqualError(t, NewCoordinator( + testhelper.NewLogger(t), &datastore.MockReplicationEventQueue{ EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { requireSuppressedCancellation(t, ctx) @@ -2958,6 +2976,7 @@ func TestNewRequestFinalizer_enqueueErrorPropagation(t *testing.T) { t.Parallel() err := NewCoordinator( + testhelper.NewLogger(t), &datastore.MockReplicationEventQueue{ EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { return datastore.ReplicationEvent{}, tc.enqueueErr diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 0a01348b3..758524660 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -25,13 +25,14 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { db := testdb.New(t) rs := NewPostgresRepositoryStore(db, nil) + logger := testhelper.SharedLogger(t) t.Run("unknown virtual storage", func(t *testing.T) { ctx := testhelper.Context(t) require.NoError(t, rs.CreateRepository(ctx, 1, "unknown", "/repo/path", "replica-path", "g1", []string{"g2", "g3"}, nil, true, false)) - cache, err := NewCachingConsistentStoragesGetter(log.FromContext(ctx), rs, []string{"vs"}) + cache, err := NewCachingConsistentStoragesGetter(logger, rs, []string{"vs"}) require.NoError(t, err) cache.Connected() @@ -55,7 +56,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "replica-path", "g1", []string{"g2", "g3"}, nil, true, false)) - cache, err := NewCachingConsistentStoragesGetter(log.FromContext(ctx), rs, []string{"vs"}) + cache, err := NewCachingConsistentStoragesGetter(logger, rs, []string{"vs"}) require.NoError(t, err) cache.Connected() @@ -180,7 +181,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "replica-path-1", "g1", []string{"g2", "g3"}, nil, true, false)) require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "replica-path-2", "g1", []string{"g2"}, nil, true, false)) - cache, err := NewCachingConsistentStoragesGetter(log.FromContext(ctx), rs, []string{"vs"}) + cache, err := NewCachingConsistentStoragesGetter(logger, rs, []string{"vs"}) require.NoError(t, err) cache.Connected() @@ -230,7 +231,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path", "replica-path", "g1", []string{"g2", "g3"}, nil, true, false)) - cache, err := NewCachingConsistentStoragesGetter(log.FromContext(ctx), rs, []string{"vs"}) + cache, err := NewCachingConsistentStoragesGetter(logger, rs, []string{"vs"}) require.NoError(t, err) cache.Connected() @@ -266,7 +267,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { require.NoError(t, rs.CreateRepository(ctx, 1, "vs", "/repo/path/1", "replica-path-1", "g1", nil, nil, true, false)) require.NoError(t, rs.CreateRepository(ctx, 2, "vs", "/repo/path/2", "replica-path-2", "g1", nil, nil, true, false)) - cache, err := NewCachingConsistentStoragesGetter(log.FromContext(ctx), rs, []string{"vs"}) + cache, err := NewCachingConsistentStoragesGetter(logger, rs, []string{"vs"}) require.NoError(t, err) cache.Connected() @@ -331,7 +332,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { }, } - cache, err := NewCachingConsistentStoragesGetter(log.FromContext(ctx), mockRepositoryStore, []string{"storage-1", "storage-2"}) + cache, err := NewCachingConsistentStoragesGetter(logger, mockRepositoryStore, []string{"storage-1", "storage-2"}) require.NoError(t, err) cache.Connected() diff --git a/internal/praefect/get_object_pool_test.go b/internal/praefect/get_object_pool_test.go index 274401f74..83585d7ba 100644 --- a/internal/praefect/get_object_pool_test.go +++ b/internal/praefect/get_object_pool_test.go @@ -58,7 +58,9 @@ func TestGetObjectPoolHandler(t *testing.T) { }, } - nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, nil, nil, nil, nil, testhelper.SharedLogger(t)) + logger := testhelper.NewLogger(t) + + nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, nil, nil, nil, nil, logger) require.NoError(t, err) t.Cleanup(nodeSet.Close) @@ -72,14 +74,14 @@ func TestGetObjectPoolHandler(t *testing.T) { srv := NewGRPCServer(&Dependencies{ Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, - Logger: testhelper.SharedLogger(t), + Logger: logger, Director: func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { return nil, errServedByGitaly }, RepositoryStore: repoStore, Router: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(db), + nodes.NewPerRepositoryElector(logger, db), StaticHealthChecker(cfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), repoStore, diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index f2d37af59..dc20c69e4 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -85,7 +85,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ "praefect-0": {virtualStorage: storages[0:1]}, }) - elector := nodes.NewPerRepositoryElector(tx) + elector := nodes.NewPerRepositoryElector(logger, tx) conns := nodeSet.Connections() rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) diff --git a/internal/praefect/nodes/per_repository.go b/internal/praefect/nodes/per_repository.go index 4a6bd7339..0b1682f73 100644 --- a/internal/praefect/nodes/per_repository.go +++ b/internal/praefect/nodes/per_repository.go @@ -17,11 +17,17 @@ var ErrNoPrimary = errors.New("no primary") // PerRepositoryElector implements an elector that selects a primary for each repository. // It elects a healthy node with most recent generation as the primary. If all nodes are // on the same generation, it picks one randomly to balance repositories in simple fashion. -type PerRepositoryElector struct{ db glsql.Querier } +type PerRepositoryElector struct { + logger log.Logger + db glsql.Querier +} // NewPerRepositoryElector returns a new per repository primary elector. -func NewPerRepositoryElector(db glsql.Querier) *PerRepositoryElector { - return &PerRepositoryElector{db: db} +func NewPerRepositoryElector(logger log.Logger, db glsql.Querier) *PerRepositoryElector { + return &PerRepositoryElector{ + logger: logger, + db: db, + } } // GetPrimary returns the primary storage of a repository. If the current primary is invalid, a new primary @@ -109,11 +115,11 @@ WHERE snapshot.repository_id = $1 } if current != previous { - log.FromContext(ctx).WithFields(log.Fields{ + pr.logger.WithFields(log.Fields{ "repository_id": repositoryID, "current_primary": current.String, "previous_primary": previous.String, - }).Info("primary node changed") + }).InfoContext(ctx, "primary node changed") } if !current.Valid { diff --git a/internal/praefect/nodes/per_repository_test.go b/internal/praefect/nodes/per_repository_test.go index d5ffa34bb..de21adfe0 100644 --- a/internal/praefect/nodes/per_repository_test.go +++ b/internal/praefect/nodes/per_repository_test.go @@ -531,9 +531,9 @@ func TestPerRepositoryElector(t *testing.T) { runElection := func(tx *testdb.TxWrapper) (string, *logrus.Entry) { logger := testhelper.NewLogger(t) hook := testhelper.AddLoggerHook(logger) - elector := NewPerRepositoryElector(tx) + elector := NewPerRepositoryElector(logger, tx) - primary, err := elector.GetPrimary(logger.ToContext(ctx), "", repositoryID) + primary, err := elector.GetPrimary(ctx, "", repositoryID) assert.Equal(t, step.error, err) assert.Less(t, len(hook.AllEntries()), 2) diff --git a/internal/praefect/router_per_repository_test.go b/internal/praefect/router_per_repository_test.go index 39c3272a4..aeace6d02 100644 --- a/internal/praefect/router_per_repository_test.go +++ b/internal/praefect/router_per_repository_test.go @@ -224,7 +224,7 @@ func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { router := NewPerRepositoryRouter( conns, - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(testhelper.NewLogger(t), tx), tc.healthyNodes, mockRandom{ intnFunc: func(n int) int { @@ -406,7 +406,7 @@ func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { router := NewPerRepositoryRouter( conns, - nodes.NewPerRepositoryElector(tx), + nodes.NewPerRepositoryElector(testhelper.NewLogger(t), tx), tc.healthyNodes, nil, rs, diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 823ed02ec..6738715df 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -189,7 +189,7 @@ func NewGRPCServer( warnDupeAddrs(deps.Logger, deps.Config) srv := grpc.NewServer(grpcOpts...) - registerServices(srv, deps.TxMgr, deps.Config, deps.RepositoryStore, deps.AssignmentStore, service.Connections(deps.Conns), deps.PrimaryGetter, deps.Checks) + registerServices(srv, deps.Logger, deps.TxMgr, deps.Config, deps.RepositoryStore, deps.AssignmentStore, service.Connections(deps.Conns), deps.PrimaryGetter, deps.Checks) if deps.Config.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ @@ -218,6 +218,7 @@ func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { // registerServices registers services praefect needs to handle RPCs on its own. func registerServices( srv *grpc.Server, + logger log.Logger, tm *transactions.Manager, conf config.Config, rs datastore.RepositoryStore, @@ -227,8 +228,8 @@ func registerServices( checks []service.CheckFunc, ) { // ServerServiceServer is necessary for the ServerInfo RPC - gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, conns, checks)) - gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(conf, rs, assignmentStore, conns, primaryGetter)) + gitalypb.RegisterServerServiceServer(srv, server.NewServer(conf, logger, conns, checks)) + gitalypb.RegisterPraefectInfoServiceServer(srv, info.NewServer(conf, logger, rs, assignmentStore, conns, primaryGetter)) gitalypb.RegisterRefTransactionServer(srv, transaction.NewServer(tm)) healthpb.RegisterHealthServer(srv, health.NewServer()) diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 51c298668..6a1bb7b75 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -89,6 +89,7 @@ func TestServerFactory(t *testing.T) { router := NewNodeManagerRouter(nodeMgr, rs) coordinator := NewCoordinator( + logger, queue, rs, router, diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 8b6221208..4d2bbd8d7 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -617,7 +617,7 @@ func TestRenameRepository(t *testing.T) { ) ctx := testhelper.Context(t) - nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil, testhelper.SharedLogger(t)) + nodeSet, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, clientHandshaker, nil, logger) require.NoError(t, err) defer nodeSet.Close() @@ -626,7 +626,7 @@ func TestRenameRepository(t *testing.T) { WithRepoStore: rs, WithRouter: NewPerRepositoryRouter( nodeSet.Connections(), - nodes.NewPerRepositoryElector(db), + nodes.NewPerRepositoryElector(logger, db), StaticHealthChecker(praefectCfg.StorageNames()), NewLockedRandom(rand.New(rand.NewSource(0))), rs, @@ -813,9 +813,9 @@ func TestProxyWrites(t *testing.T) { } queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) - entry := testhelper.SharedLogger(t) + logger := testhelper.SharedLogger(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() @@ -834,6 +834,7 @@ func TestProxyWrites(t *testing.T) { } coordinator := NewCoordinator( + logger, queue, rs, NewNodeManagerRouter(nodeMgr, rs), @@ -953,7 +954,7 @@ func TestErrorThreshold(t *testing.T) { ctx := testhelper.Context(t) queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) - entry := testhelper.SharedLogger(t) + logger := testhelper.SharedLogger(t) testCases := []struct { desc string @@ -984,11 +985,12 @@ func TestErrorThreshold(t *testing.T) { require.NoError(t, err) rs := datastore.MockRepositoryStore{} - nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, errorTracker, nil, nil) + nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, errorTracker, nil, nil) require.NoError(t, err) defer nodeMgr.Stop() coordinator := NewCoordinator( + logger, queue, rs, NewNodeManagerRouter(nodeMgr, rs), diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 0106a7fcd..d8b99bef1 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/service" @@ -36,6 +37,7 @@ type PrimaryGetter interface { type Server struct { gitalypb.UnimplementedPraefectInfoServiceServer conf config.Config + logger log.Logger rs datastore.RepositoryStore assignmentStore AssignmentStore conns service.Connections @@ -45,6 +47,7 @@ type Server struct { // NewServer creates a new instance of a grpc InfoServiceServer func NewServer( conf config.Config, + logger log.Logger, rs datastore.RepositoryStore, assignmentStore AssignmentStore, conns service.Connections, @@ -52,6 +55,7 @@ func NewServer( ) gitalypb.PraefectInfoServiceServer { return &Server{ conf: conf, + logger: logger, rs: rs, assignmentStore: assignmentStore, conns: conns, diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go index 644007a37..5ce46d28b 100644 --- a/internal/praefect/service/server/info.go +++ b/internal/praefect/service/server/info.go @@ -6,7 +6,6 @@ import ( "github.com/google/uuid" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" - "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "google.golang.org/grpc" ) @@ -52,7 +51,7 @@ func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) client := gitalypb.NewServerServiceClient(conn) resp, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) if err != nil { - log.FromContext(ctx).WithField("storage", storage).WithError(err).Error("error getting server info") + s.logger.WithField("storage", storage).WithError(err).ErrorContext(ctx, "error getting server info") return } diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go index 6662457e0..a1bbca373 100644 --- a/internal/praefect/service/server/server.go +++ b/internal/praefect/service/server/server.go @@ -1,6 +1,7 @@ package server import ( + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/service" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -10,14 +11,16 @@ import ( type Server struct { gitalypb.UnimplementedServerServiceServer conf config.Config + logger log.Logger conns service.Connections checks []service.CheckFunc } // NewServer creates a new instance of a grpc ServerServiceServer -func NewServer(conf config.Config, conns service.Connections, checks []service.CheckFunc) gitalypb.ServerServiceServer { +func NewServer(conf config.Config, logger log.Logger, conns service.Connections, checks []service.CheckFunc) gitalypb.ServerServiceServer { s := &Server{ conf: conf, + logger: logger, conns: conns, checks: checks, } diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go index 2583a6520..4a8213d06 100644 --- a/internal/praefect/testserver.go +++ b/internal/praefect/testserver.go @@ -222,6 +222,7 @@ func RunPraefectServer( } coordinator := NewCoordinator( + opt.WithLogger, opt.WithQueue, opt.WithRepoStore, opt.WithRouter, diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index bea23c51f..2ed44a5ab 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -518,7 +518,7 @@ func TestVerifier(t *testing.T) { testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ "praefect-0": conf.StorageNames(), }) - elector := nodes.NewPerRepositoryElector(tx) + elector := nodes.NewPerRepositoryElector(logger, tx) conns := nodeSet.Connections() rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 770fdc74c..c29350c88 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -368,6 +368,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * } return &service.Dependencies{ + Logger: gsd.logger, Cfg: cfg, ClientPool: gsd.conns, StorageLocator: gsd.locator, |