diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-29 08:52:38 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-29 09:03:00 +0300 |
commit | 371189686eee1d25081c2d907fe9138d1c279f66 (patch) | |
tree | 313dd984642ef3ceed93ad15e445f5cec3933872 /internal/praefect | |
parent | 9a5893ddba90a1e8e2ebb743042cbacdce74b468 (diff) |
praefect/coordinator: Convert to use context-aware logging
Inject a logger into the Praefect coordinator and convert it to use
context-aware logging.
Diffstat (limited to 'internal/praefect')
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 34 | ||||
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 19 | ||||
-rw-r--r-- | internal/praefect/server_factory_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/testserver.go | 1 |
7 files changed, 50 insertions, 22 deletions
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 073de8b4f..b01529082 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -157,7 +157,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, txMgr := transactions.NewManager(conf) - coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator(logger, queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered) srv := NewGRPCServer(&Dependencies{ Config: conf, diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index e2a8875e4..76d0b15f2 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -186,6 +186,7 @@ type grpcCall struct { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { + logger log.Logger router Router txMgr *transactions.Manager queue datastore.ReplicationEventQueue @@ -198,6 +199,7 @@ type Coordinator struct { // NewCoordinator returns a new Coordinator that utilizes the provided logger func NewCoordinator( + logger log.Logger, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, router Router, @@ -213,6 +215,7 @@ func NewCoordinator( } coordinator := &Coordinator{ + logger: logger, queue: queue, rs: rs, registry: r, @@ -490,15 +493,15 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall defer nodeErrors.Unlock() nodeErrors.errByNode[secondary.Storage] = err - log.FromContext(ctx).WithError(err). - Error("proxying to secondary failed") + c.logger.WithError(err). + ErrorContext(ctx, "proxying to secondary failed") // Cancels failed node's voter in its current subtransaction. // Also updates internal state of subtransaction to fail and // release blocked voters if quorum becomes impossible. if err := c.txMgr.CancelTransactionNodeVoter(transaction.ID(), secondary.Storage); err != nil { - log.FromContext(ctx).WithError(err). - Error("canceling secondary voter failed") + c.logger.WithError(err). + ErrorContext(ctx, "canceling secondary voter failed") } // The error is ignored, so we do not abort transactions @@ -543,9 +546,9 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall continue } - log.FromContext(ctx). + c.logger. WithError(err). - Error("coordinator proxy stream finalizer failure") + ErrorContext(ctx, "coordinator proxy stream finalizer failure") } return firstErr } @@ -588,7 +591,7 @@ func (c *Coordinator) maintenanceStreamParameters(ctx context.Context, call grpc defer nodeErrors.Unlock() nodeErrors.errByNode[node.Storage] = err - log.FromContext(ctx).WithField("gitaly_storage", node.Storage).WithError(err).Error("proxying maintenance RPC to node failed") + c.logger.WithField("gitaly_storage", node.Storage).WithError(err).ErrorContext(ctx, "proxying maintenance RPC to node failed") // We ignore any errors returned by nodes such that they all have a // chance to finish their maintenance RPC in a best-effort strategy. @@ -665,7 +668,7 @@ func streamParametersContext(ctx context.Context) context.Context { func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. - log.FromContext(ctx).WithField("method", fullMethodName).Debug("Stream director received method") + c.logger.WithField("method", fullMethodName).DebugContext(ctx, "Stream director received method") mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { @@ -887,7 +890,7 @@ func (c *Coordinator) createTransactionFinalizer( ) func() error { return func() error { primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries( - ctx, route, transaction, nodeErrors, c.txReplicationCountMetric) + ctx, c.logger, route, transaction, nodeErrors, c.txReplicationCountMetric) if !primaryDirtied { // If the primary replica was not modified then we don't need to consider the secondaries // outdated. Praefect requires the primary to be always part of the quorum, so no changes @@ -921,6 +924,7 @@ func (c *Coordinator) createTransactionFinalizer( // replication jobs to repair state. func getUpdatedAndOutdatedSecondaries( ctx context.Context, + logger log.Logger, route RepositoryMutatorRoute, transaction transactions.Transaction, nodeErrors *nodeErrors, @@ -950,10 +954,10 @@ func getUpdatedAndOutdatedSecondaries( nodesByState := make(map[string][]string) defer func() { - log.FromContext(ctx). + logger. WithField("transaction.primary", route.Primary.Storage). WithField("transaction.secondaries", nodesByState). - Info("transactional node states") + InfoContext(ctx, "transactional node states") for reason, nodes := range nodesByState { replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes))) @@ -1069,7 +1073,7 @@ func (c *Coordinator) newRequestFinalizer( ctx, cancel := context.WithTimeout(helper.SuppressCancellation(originalCtx), 30*time.Second) defer cancel() - logEntry := log.FromContext(ctx).WithFields(log.Fields{ + logEntry := c.logger.WithFields(log.Fields{ "replication.cause": cause, "replication.change": change, "replication.primary": primary, @@ -1080,7 +1084,7 @@ func (c *Coordinator) newRequestFinalizer( if len(outdatedSecondaries) > 0 { logEntry = logEntry.WithField("replication.outdated", outdatedSecondaries) } - logEntry.Info("queueing replication jobs") + logEntry.InfoContext(ctx, "queueing replication jobs") switch change { case datastore.UpdateRepo: @@ -1099,7 +1103,7 @@ func (c *Coordinator) newRequestFinalizer( return fmt.Errorf("rename repository: %w", err) } - log.FromContext(ctx).WithError(err).Info("renamed repository does not have a store entry") + c.logger.WithError(err).InfoContext(ctx, "renamed repository does not have a store entry") } case datastore.CreateRepo: repositorySpecificPrimariesEnabled := c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository @@ -1146,7 +1150,7 @@ func (c *Coordinator) newRequestFinalizer( g.Go(func() error { if _, err := c.queue.Enqueue(ctx, event); err != nil { if errors.As(err, &datastore.ReplicationEventExistsError{}) { - log.FromContext(ctx).WithError(err).Info("replication event queue already has similar entry") + c.logger.WithError(err).InfoContext(ctx, "replication event queue already has similar entry") return nil } diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 52d0d0ef5..d13392fd5 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -175,6 +175,8 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { }, } + logger := testhelper.NewLogger(t) + var replicationWaitGroup sync.WaitGroup queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { @@ -220,12 +222,13 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { nil, nil, nil, - testhelper.SharedLogger(t), + logger, ) require.NoError(t, err) defer nodeSet.Close() coordinator := NewCoordinator( + logger, queueInterceptor, rs, NewPerRepositoryRouter( diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 8d5254bdf..4d1b7fabf 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -94,6 +94,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { } coordinator := NewCoordinator( + testhelper.NewLogger(t), datastore.NewPostgresReplicationEventQueue(db), rs, NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) { @@ -379,6 +380,7 @@ func TestStreamDirectorMutator(t *testing.T) { }) coordinator := NewCoordinator( + testhelper.NewLogger(t), queueInterceptor, rs, NewPerRepositoryRouter( @@ -486,6 +488,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( + testhelper.NewLogger(t), datastore.NewPostgresReplicationEventQueue(testdb.New(t)), rs, NewNodeManagerRouter(nodeMgr, rs), @@ -598,6 +601,7 @@ func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) { txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( + testhelper.NewLogger(t), datastore.NewPostgresReplicationEventQueue(testdb.New(t)), rs, NewNodeManagerRouter(nodeMgr, rs), @@ -701,6 +705,7 @@ func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) { } coordinator := NewCoordinator( + testhelper.NewLogger(t), &datastore.MockReplicationEventQueue{}, rs, router, @@ -783,6 +788,7 @@ func TestStreamDirector_maintenance(t *testing.T) { queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) coordinator := NewCoordinator( + testhelper.NewLogger(t), queueInterceptor, rs, NewPerRepositoryRouter( @@ -1090,6 +1096,7 @@ func TestStreamDirectorAccessor(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { coordinator := NewCoordinator( + testhelper.NewLogger(t), queue, rs, tc.router, @@ -1180,6 +1187,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { txMgr := transactions.NewManager(conf) coordinator := NewCoordinator( + testhelper.NewLogger(t), queue, repoStore, NewNodeManagerRouter(nodeMgr, repoStore), @@ -1560,6 +1568,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) coordinator := NewCoordinator( + testhelper.NewLogger(t), queueInterceptor, repositoryStore, router, @@ -1721,6 +1730,7 @@ func TestAbsentCorrelationID(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), queueInterceptor, rs, NewNodeManagerRouter(nodeMgr, rs), @@ -1850,6 +1860,7 @@ func TestStreamDirectorStorageScope(t *testing.T) { nodeMgr.Start(0, time.Second) defer nodeMgr.Stop() coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(nodeMgr, rs), @@ -1913,6 +1924,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -1942,6 +1954,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -1972,6 +1985,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -2003,6 +2017,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) { } rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( + testhelper.NewLogger(t), nil, rs, NewNodeManagerRouter(mgr, rs), @@ -2820,7 +2835,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { Name: "stub", Help: "help", }, []string{"reason"}) - primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric) + primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, testhelper.NewLogger(t), route, transaction, nodeErrors, metric) require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied) require.ElementsMatch(t, tc.expectedUpdated, updated) require.ElementsMatch(t, tc.expectedOutdated, outdated) @@ -2889,6 +2904,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { require.EqualError(t, NewCoordinator( + testhelper.NewLogger(t), &datastore.MockReplicationEventQueue{ EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { requireSuppressedCancellation(t, ctx) @@ -2958,6 +2974,7 @@ func TestNewRequestFinalizer_enqueueErrorPropagation(t *testing.T) { t.Parallel() err := NewCoordinator( + testhelper.NewLogger(t), &datastore.MockReplicationEventQueue{ EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { return datastore.ReplicationEvent{}, tc.enqueueErr diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 51c298668..6a1bb7b75 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -89,6 +89,7 @@ func TestServerFactory(t *testing.T) { router := NewNodeManagerRouter(nodeMgr, rs) coordinator := NewCoordinator( + logger, queue, rs, router, diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 8b6221208..a10197261 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -813,9 +813,9 @@ func TestProxyWrites(t *testing.T) { } queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) - entry := testhelper.SharedLogger(t) + logger := testhelper.SharedLogger(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() @@ -834,6 +834,7 @@ func TestProxyWrites(t *testing.T) { } coordinator := NewCoordinator( + logger, queue, rs, NewNodeManagerRouter(nodeMgr, rs), @@ -953,7 +954,7 @@ func TestErrorThreshold(t *testing.T) { ctx := testhelper.Context(t) queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) - entry := testhelper.SharedLogger(t) + logger := testhelper.SharedLogger(t) testCases := []struct { desc string @@ -984,11 +985,12 @@ func TestErrorThreshold(t *testing.T) { require.NoError(t, err) rs := datastore.MockRepositoryStore{} - nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, errorTracker, nil, nil) + nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, errorTracker, nil, nil) require.NoError(t, err) defer nodeMgr.Stop() coordinator := NewCoordinator( + logger, queue, rs, NewNodeManagerRouter(nodeMgr, rs), diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go index 2583a6520..4a8213d06 100644 --- a/internal/praefect/testserver.go +++ b/internal/praefect/testserver.go @@ -222,6 +222,7 @@ func RunPraefectServer( } coordinator := NewCoordinator( + opt.WithLogger, opt.WithQueue, opt.WithRepoStore, opt.WithRouter, |