Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-29 08:52:38 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-29 09:03:00 +0300
commit371189686eee1d25081c2d907fe9138d1c279f66 (patch)
tree313dd984642ef3ceed93ad15e445f5cec3933872 /internal/praefect
parent9a5893ddba90a1e8e2ebb743042cbacdce74b468 (diff)
praefect/coordinator: Convert to use context-aware logging
Inject a logger into the Praefect coordinator and convert it to use context-aware logging.
Diffstat (limited to 'internal/praefect')
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/coordinator.go34
-rw-r--r--internal/praefect/coordinator_pg_test.go5
-rw-r--r--internal/praefect/coordinator_test.go19
-rw-r--r--internal/praefect/server_factory_test.go1
-rw-r--r--internal/praefect/server_test.go10
-rw-r--r--internal/praefect/testserver.go1
7 files changed, 50 insertions, 22 deletions
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 073de8b4f..b01529082 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -157,7 +157,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
txMgr := transactions.NewManager(conf)
- coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(logger, queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered)
srv := NewGRPCServer(&Dependencies{
Config: conf,
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index e2a8875e4..76d0b15f2 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -186,6 +186,7 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
+ logger log.Logger
router Router
txMgr *transactions.Manager
queue datastore.ReplicationEventQueue
@@ -198,6 +199,7 @@ type Coordinator struct {
// NewCoordinator returns a new Coordinator that utilizes the provided logger
func NewCoordinator(
+ logger log.Logger,
queue datastore.ReplicationEventQueue,
rs datastore.RepositoryStore,
router Router,
@@ -213,6 +215,7 @@ func NewCoordinator(
}
coordinator := &Coordinator{
+ logger: logger,
queue: queue,
rs: rs,
registry: r,
@@ -490,15 +493,15 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
defer nodeErrors.Unlock()
nodeErrors.errByNode[secondary.Storage] = err
- log.FromContext(ctx).WithError(err).
- Error("proxying to secondary failed")
+ c.logger.WithError(err).
+ ErrorContext(ctx, "proxying to secondary failed")
// Cancels failed node's voter in its current subtransaction.
// Also updates internal state of subtransaction to fail and
// release blocked voters if quorum becomes impossible.
if err := c.txMgr.CancelTransactionNodeVoter(transaction.ID(), secondary.Storage); err != nil {
- log.FromContext(ctx).WithError(err).
- Error("canceling secondary voter failed")
+ c.logger.WithError(err).
+ ErrorContext(ctx, "canceling secondary voter failed")
}
// The error is ignored, so we do not abort transactions
@@ -543,9 +546,9 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
continue
}
- log.FromContext(ctx).
+ c.logger.
WithError(err).
- Error("coordinator proxy stream finalizer failure")
+ ErrorContext(ctx, "coordinator proxy stream finalizer failure")
}
return firstErr
}
@@ -588,7 +591,7 @@ func (c *Coordinator) maintenanceStreamParameters(ctx context.Context, call grpc
defer nodeErrors.Unlock()
nodeErrors.errByNode[node.Storage] = err
- log.FromContext(ctx).WithField("gitaly_storage", node.Storage).WithError(err).Error("proxying maintenance RPC to node failed")
+ c.logger.WithField("gitaly_storage", node.Storage).WithError(err).ErrorContext(ctx, "proxying maintenance RPC to node failed")
// We ignore any errors returned by nodes such that they all have a
// chance to finish their maintenance RPC in a best-effort strategy.
@@ -665,7 +668,7 @@ func streamParametersContext(ctx context.Context) context.Context {
func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
- log.FromContext(ctx).WithField("method", fullMethodName).Debug("Stream director received method")
+ c.logger.WithField("method", fullMethodName).DebugContext(ctx, "Stream director received method")
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
@@ -887,7 +890,7 @@ func (c *Coordinator) createTransactionFinalizer(
) func() error {
return func() error {
primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(
- ctx, route, transaction, nodeErrors, c.txReplicationCountMetric)
+ ctx, c.logger, route, transaction, nodeErrors, c.txReplicationCountMetric)
if !primaryDirtied {
// If the primary replica was not modified then we don't need to consider the secondaries
// outdated. Praefect requires the primary to be always part of the quorum, so no changes
@@ -921,6 +924,7 @@ func (c *Coordinator) createTransactionFinalizer(
// replication jobs to repair state.
func getUpdatedAndOutdatedSecondaries(
ctx context.Context,
+ logger log.Logger,
route RepositoryMutatorRoute,
transaction transactions.Transaction,
nodeErrors *nodeErrors,
@@ -950,10 +954,10 @@ func getUpdatedAndOutdatedSecondaries(
nodesByState := make(map[string][]string)
defer func() {
- log.FromContext(ctx).
+ logger.
WithField("transaction.primary", route.Primary.Storage).
WithField("transaction.secondaries", nodesByState).
- Info("transactional node states")
+ InfoContext(ctx, "transactional node states")
for reason, nodes := range nodesByState {
replicationCountMetric.WithLabelValues(reason).Add(float64(len(nodes)))
@@ -1069,7 +1073,7 @@ func (c *Coordinator) newRequestFinalizer(
ctx, cancel := context.WithTimeout(helper.SuppressCancellation(originalCtx), 30*time.Second)
defer cancel()
- logEntry := log.FromContext(ctx).WithFields(log.Fields{
+ logEntry := c.logger.WithFields(log.Fields{
"replication.cause": cause,
"replication.change": change,
"replication.primary": primary,
@@ -1080,7 +1084,7 @@ func (c *Coordinator) newRequestFinalizer(
if len(outdatedSecondaries) > 0 {
logEntry = logEntry.WithField("replication.outdated", outdatedSecondaries)
}
- logEntry.Info("queueing replication jobs")
+ logEntry.InfoContext(ctx, "queueing replication jobs")
switch change {
case datastore.UpdateRepo:
@@ -1099,7 +1103,7 @@ func (c *Coordinator) newRequestFinalizer(
return fmt.Errorf("rename repository: %w", err)
}
- log.FromContext(ctx).WithError(err).Info("renamed repository does not have a store entry")
+ c.logger.WithError(err).InfoContext(ctx, "renamed repository does not have a store entry")
}
case datastore.CreateRepo:
repositorySpecificPrimariesEnabled := c.conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository
@@ -1146,7 +1150,7 @@ func (c *Coordinator) newRequestFinalizer(
g.Go(func() error {
if _, err := c.queue.Enqueue(ctx, event); err != nil {
if errors.As(err, &datastore.ReplicationEventExistsError{}) {
- log.FromContext(ctx).WithError(err).Info("replication event queue already has similar entry")
+ c.logger.WithError(err).InfoContext(ctx, "replication event queue already has similar entry")
return nil
}
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 52d0d0ef5..d13392fd5 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -175,6 +175,8 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
},
}
+ logger := testhelper.NewLogger(t)
+
var replicationWaitGroup sync.WaitGroup
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
@@ -220,12 +222,13 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
nil,
nil,
nil,
- testhelper.SharedLogger(t),
+ logger,
)
require.NoError(t, err)
defer nodeSet.Close()
coordinator := NewCoordinator(
+ logger,
queueInterceptor,
rs,
NewPerRepositoryRouter(
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 8d5254bdf..4d1b7fabf 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -94,6 +94,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
}
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
datastore.NewPostgresReplicationEventQueue(db),
rs,
NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) {
@@ -379,6 +380,7 @@ func TestStreamDirectorMutator(t *testing.T) {
})
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
queueInterceptor,
rs,
NewPerRepositoryRouter(
@@ -486,6 +488,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
rs,
NewNodeManagerRouter(nodeMgr, rs),
@@ -598,6 +601,7 @@ func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) {
txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
rs,
NewNodeManagerRouter(nodeMgr, rs),
@@ -701,6 +705,7 @@ func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) {
}
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
&datastore.MockReplicationEventQueue{},
rs,
router,
@@ -783,6 +788,7 @@ func TestStreamDirector_maintenance(t *testing.T) {
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx))
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
queueInterceptor,
rs,
NewPerRepositoryRouter(
@@ -1090,6 +1096,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
queue,
rs,
tc.router,
@@ -1180,6 +1187,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
txMgr := transactions.NewManager(conf)
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
queue,
repoStore,
NewNodeManagerRouter(nodeMgr, repoStore),
@@ -1560,6 +1568,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx))
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
queueInterceptor,
repositoryStore,
router,
@@ -1721,6 +1730,7 @@ func TestAbsentCorrelationID(t *testing.T) {
rs := datastore.MockRepositoryStore{}
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
queueInterceptor,
rs,
NewNodeManagerRouter(nodeMgr, rs),
@@ -1850,6 +1860,7 @@ func TestStreamDirectorStorageScope(t *testing.T) {
nodeMgr.Start(0, time.Second)
defer nodeMgr.Stop()
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
nil,
rs,
NewNodeManagerRouter(nodeMgr, rs),
@@ -1913,6 +1924,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
rs := datastore.MockRepositoryStore{}
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
nil,
rs,
NewNodeManagerRouter(mgr, rs),
@@ -1942,6 +1954,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
rs := datastore.MockRepositoryStore{}
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
nil,
rs,
NewNodeManagerRouter(mgr, rs),
@@ -1972,6 +1985,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
rs := datastore.MockRepositoryStore{}
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
nil,
rs,
NewNodeManagerRouter(mgr, rs),
@@ -2003,6 +2017,7 @@ func TestStreamDirectorStorageScopeError(t *testing.T) {
}
rs := datastore.MockRepositoryStore{}
coordinator := NewCoordinator(
+ testhelper.NewLogger(t),
nil,
rs,
NewNodeManagerRouter(mgr, rs),
@@ -2820,7 +2835,7 @@ func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
Name: "stub", Help: "help",
}, []string{"reason"})
- primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric)
+ primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, testhelper.NewLogger(t), route, transaction, nodeErrors, metric)
require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
require.ElementsMatch(t, tc.expectedUpdated, updated)
require.ElementsMatch(t, tc.expectedOutdated, outdated)
@@ -2889,6 +2904,7 @@ func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
require.EqualError(t,
NewCoordinator(
+ testhelper.NewLogger(t),
&datastore.MockReplicationEventQueue{
EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) {
requireSuppressedCancellation(t, ctx)
@@ -2958,6 +2974,7 @@ func TestNewRequestFinalizer_enqueueErrorPropagation(t *testing.T) {
t.Parallel()
err := NewCoordinator(
+ testhelper.NewLogger(t),
&datastore.MockReplicationEventQueue{
EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) {
return datastore.ReplicationEvent{}, tc.enqueueErr
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 51c298668..6a1bb7b75 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -89,6 +89,7 @@ func TestServerFactory(t *testing.T) {
router := NewNodeManagerRouter(nodeMgr, rs)
coordinator := NewCoordinator(
+ logger,
queue,
rs,
router,
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 8b6221208..a10197261 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -813,9 +813,9 @@ func TestProxyWrites(t *testing.T) {
}
queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
- entry := testhelper.SharedLogger(t)
+ logger := testhelper.SharedLogger(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
@@ -834,6 +834,7 @@ func TestProxyWrites(t *testing.T) {
}
coordinator := NewCoordinator(
+ logger,
queue,
rs,
NewNodeManagerRouter(nodeMgr, rs),
@@ -953,7 +954,7 @@ func TestErrorThreshold(t *testing.T) {
ctx := testhelper.Context(t)
queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
- entry := testhelper.SharedLogger(t)
+ logger := testhelper.SharedLogger(t)
testCases := []struct {
desc string
@@ -984,11 +985,12 @@ func TestErrorThreshold(t *testing.T) {
require.NoError(t, err)
rs := datastore.MockRepositoryStore{}
- nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, errorTracker, nil, nil)
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, errorTracker, nil, nil)
require.NoError(t, err)
defer nodeMgr.Stop()
coordinator := NewCoordinator(
+ logger,
queue,
rs,
NewNodeManagerRouter(nodeMgr, rs),
diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go
index 2583a6520..4a8213d06 100644
--- a/internal/praefect/testserver.go
+++ b/internal/praefect/testserver.go
@@ -222,6 +222,7 @@ func RunPraefectServer(
}
coordinator := NewCoordinator(
+ opt.WithLogger,
opt.WithQueue,
opt.WithRepoStore,
opt.WithRouter,