diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-25 13:20:29 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-10-05 13:21:30 +0300 |
commit | 45dc75f83e29051a0ccb292b42d91c32c6653f90 (patch) | |
tree | ec9adc3dc0d898c827c9069144f473d167c4bcef | |
parent | 132727054bf5ed52b82c11189efc3a3e3ae9a74e (diff) |
praefect/transactions: Replace use of ctxlogrus with injected logger
The ctxlogrus package is going away with the replacement being log
fields extracted from the context via `log.DebugContext()` et al.
Refactor the code to stop using ctxlogrus by injecting a logger and
using the new context-based logging methods.
-rw-r--r-- | internal/cli/praefect/serve.go | 2 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator_pg_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 50 | ||||
-rw-r--r-- | internal/praefect/info_service_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/server_factory_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 14 | ||||
-rw-r--r-- | internal/praefect/testserver.go | 12 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/transactions/manager.go | 32 | ||||
-rw-r--r-- | internal/praefect/transactions/manager_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/verifier_test.go | 2 |
12 files changed, 67 insertions, 61 deletions
diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go index 59f56b7c5..74f155e3e 100644 --- a/internal/cli/praefect/serve.go +++ b/internal/cli/praefect/serve.go @@ -251,7 +251,7 @@ func server( } } - transactionManager := transactions.NewManager(conf) + transactionManager := transactions.NewManager(conf, logger) sidechannelRegistry := sidechannel.NewRegistry() backchannelCfg := backchannel.DefaultConfiguration() diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index b01529082..ed9e2e741 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -155,7 +155,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string, require.NoError(t, err) defer nodeMgr.Stop() - txMgr := transactions.NewManager(conf) + txMgr := transactions.NewManager(conf, logger) coordinator := NewCoordinator(logger, queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered) diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go index 9cf2baa23..ae583e74f 100644 --- a/internal/praefect/coordinator_pg_test.go +++ b/internal/praefect/coordinator_pg_test.go @@ -191,7 +191,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) { } ctx := testhelper.Context(t) - txMgr := transactions.NewManager(conf) + txMgr := transactions.NewManager(conf, logger) tx := db.Begin(t) defer tx.Rollback(t) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index a8c873fd4..07a658090 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -92,8 +92,10 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { }, } + logger := testhelper.NewLogger(t) + coordinator := NewCoordinator( - testhelper.NewLogger(t), + logger, datastore.NewPostgresReplicationEventQueue(db), rs, NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) { @@ -104,7 +106,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { }}, }, nil }}, rs), - transactions.NewManager(conf), + transactions.NewManager(conf, logger), conf, protoregistry.GitalyProtoPreregistered, ) @@ -163,9 +165,10 @@ func TestStreamDirectorMutator(t *testing.T) { } db := testdb.New(t) - txMgr := transactions.NewManager(conf) + logger := testhelper.SharedLogger(t) + txMgr := transactions.NewManager(conf, logger) - nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil, testhelper.SharedLogger(t)) + nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil, logger) require.NoError(t, err) defer nodeSet.Close() @@ -485,10 +488,11 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { }, } - txMgr := transactions.NewManager(conf) + logger := testhelper.NewLogger(t) + txMgr := transactions.NewManager(conf, logger) coordinator := NewCoordinator( - testhelper.NewLogger(t), + logger, datastore.NewPostgresReplicationEventQueue(testdb.New(t)), rs, NewNodeManagerRouter(nodeMgr, rs), @@ -598,10 +602,11 @@ func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) { }, } - txMgr := transactions.NewManager(conf) + logger := testhelper.NewLogger(t) + txMgr := transactions.NewManager(conf, logger) coordinator := NewCoordinator( - testhelper.NewLogger(t), + logger, datastore.NewPostgresReplicationEventQueue(testdb.New(t)), rs, NewNodeManagerRouter(nodeMgr, rs), @@ -704,12 +709,14 @@ func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) { }, } + logger := testhelper.NewLogger(t) + coordinator := NewCoordinator( - testhelper.NewLogger(t), + logger, &datastore.MockReplicationEventQueue{}, rs, router, - transactions.NewManager(conf), + transactions.NewManager(conf, logger), conf, protoregistry.GitalyProtoPreregistered, ) @@ -1066,15 +1073,15 @@ func TestStreamDirectorAccessor(t *testing.T) { } ctx := testhelper.Context(t) - entry := testhelper.SharedLogger(t) + logger := testhelper.SharedLogger(t) rs := datastore.MockRepositoryStore{} - nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) nodeMgr.Start(0, time.Minute) defer nodeMgr.Stop() - txMgr := transactions.NewManager(conf) + txMgr := transactions.NewManager(conf, logger) for _, tc := range []struct { desc string @@ -1172,7 +1179,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { } ctx := testhelper.Context(t) - entry := testhelper.SharedLogger(t) + logger := testhelper.SharedLogger(t) repoStore := datastore.MockRepositoryStore{ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { @@ -1180,12 +1187,12 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { }, } - nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + nodeMgr, err := nodes.NewManager(logger, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) nodeMgr.Start(0, time.Minute) defer nodeMgr.Stop() - txMgr := transactions.NewManager(conf) + txMgr := transactions.NewManager(conf, logger) coordinator := NewCoordinator( testhelper.NewLogger(t), @@ -1565,11 +1572,12 @@ func TestStreamDirector_repo_creation(t *testing.T) { conf.DefaultReplicationFactors(), ) - txMgr := transactions.NewManager(conf) + logger := testhelper.NewLogger(t) + txMgr := transactions.NewManager(conf, logger) queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx)) coordinator := NewCoordinator( - testhelper.NewLogger(t), + logger, queueInterceptor, repositoryStore, router, @@ -1720,14 +1728,14 @@ func TestAbsentCorrelationID(t *testing.T) { } ctx := testhelper.Context(t) - entry := testhelper.SharedLogger(t) + logger := testhelper.SharedLogger(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) nodeMgr.Start(0, time.Hour) defer nodeMgr.Stop() - txMgr := transactions.NewManager(conf) + txMgr := transactions.NewManager(conf, logger) rs := datastore.MockRepositoryStore{} coordinator := NewCoordinator( diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index dc20c69e4..f07be3850 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -56,7 +56,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { db := testdb.New(t) logger := testhelper.SharedLogger(t) // the only thing used from the config is the grpc_latency_buckets which is not relevant for the test - txManager := transactions.NewManager(config.Config{}) + txManager := transactions.NewManager(config.Config{}, logger) sidechannelRegistry := sidechannel.NewRegistry() nodeSet, err := DialNodes( ctx, diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 6a1bb7b75..262eadb8a 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -77,7 +77,7 @@ func TestServerFactory(t *testing.T) { queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) rs := datastore.MockRepositoryStore{} - txMgr := transactions.NewManager(conf) + txMgr := transactions.NewManager(conf, logger) sidechannelRegistry := sidechannel.NewRegistry() clientHandshaker := backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(logger, transaction.NewServer(txMgr), sidechannelRegistry), backchannel.DefaultConfiguration()) nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, sidechannelRegistry) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 4d2bbd8d7..4110c31e4 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -53,9 +53,8 @@ import ( ) func TestNewBackchannelServerFactory(t *testing.T) { - mgr := transactions.NewManager(config.Config{}) - logger := testhelper.SharedLogger(t) + mgr := transactions.NewManager(config.Config{}, logger) registry := backchannel.NewRegistry() lm := listenmux.New(insecure.NewCredentials()) @@ -533,7 +532,7 @@ func TestRemoveRepository(t *testing.T) { logger := testhelper.SharedLogger(t) queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) repoStore := defaultRepoStore(praefectCfg) - txMgr := defaultTxMgr(praefectCfg) + txMgr := defaultTxMgr(praefectCfg, logger) nodeMgr, err := nodes.NewManager(testhelper.SharedLogger(t), praefectCfg, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, backchannel.NewClientHandshaker( @@ -604,8 +603,8 @@ func TestRenameRepository(t *testing.T) { rs := datastore.NewPostgresRepositoryStore(db, nil) - txManager := transactions.NewManager(praefectCfg) logger := testhelper.SharedLogger(t) + txManager := transactions.NewManager(praefectCfg, logger) clientHandshaker := backchannel.NewClientHandshaker( logger, NewBackchannelServerFactory( @@ -777,7 +776,9 @@ func newSmartHTTPGrpcServer(t *testing.T, cfg gconfig.Cfg, smartHTTPService gita func TestProxyWrites(t *testing.T) { t.Parallel() - txMgr := transactions.NewManager(config.Config{}) + + logger := testhelper.SharedLogger(t) + txMgr := transactions.NewManager(config.Config{}, logger) smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr} @@ -813,7 +814,6 @@ func TestProxyWrites(t *testing.T) { } queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) - logger := testhelper.SharedLogger(t) nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) require.NoError(t, err) @@ -994,7 +994,7 @@ func TestErrorThreshold(t *testing.T) { queue, rs, NewNodeManagerRouter(nodeMgr, rs), - transactions.NewManager(conf), + transactions.NewManager(conf, logger), conf, protoregistry.GitalyProtoPreregistered, ) diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go index 4a8213d06..835b5377d 100644 --- a/internal/praefect/testserver.go +++ b/internal/praefect/testserver.go @@ -87,8 +87,8 @@ func defaultQueue(tb testing.TB) datastore.ReplicationEventQueue { return datastore.NewPostgresReplicationEventQueue(testdb.New(tb)) } -func defaultTxMgr(conf config.Config) *transactions.Manager { - return transactions.NewManager(conf) +func defaultTxMgr(conf config.Config, logger log.Logger) *transactions.Manager { + return transactions.NewManager(conf, logger) } func defaultNodeMgr(tb testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager { @@ -190,6 +190,9 @@ func RunPraefectServer( ) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) { var cleanups []testhelper.Cleanup + if opt.WithLogger == nil { + opt.WithLogger = testhelper.SharedLogger(tb) + } if opt.WithQueue == nil { opt.WithQueue = defaultQueue(tb) } @@ -197,7 +200,7 @@ func RunPraefectServer( opt.WithRepoStore = defaultRepoStore(conf) } if opt.WithTxMgr == nil { - opt.WithTxMgr = defaultTxMgr(conf) + opt.WithTxMgr = defaultTxMgr(conf, opt.WithLogger) } if opt.WithBackends != nil { cleanups = append(cleanups, opt.WithBackends(conf.VirtualStorages)...) @@ -205,9 +208,6 @@ func RunPraefectServer( if opt.WithAnnotations == nil { opt.WithAnnotations = protoregistry.GitalyProtoPreregistered } - if opt.WithLogger == nil { - opt.WithLogger = testhelper.SharedLogger(tb) - } if opt.WithNodeMgr == nil { opt.WithNodeMgr = defaultNodeMgr(tb, conf, opt.WithRepoStore) } diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index a6216cb61..1e05d0522 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -29,7 +29,7 @@ type voter struct { func runPraefectServerAndTxMgr(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { conf := testConfig(1) - txMgr := transactions.NewManager(conf) + txMgr := transactions.NewManager(conf, testhelper.SharedLogger(tb)) cc, _, cleanup := RunPraefectServer(tb, ctx, conf, BuildOptions{ WithTxMgr: txMgr, WithNodeMgr: nullNodeMgr{}, // to suppress node address issues @@ -255,7 +255,7 @@ func TestTransactionWithContextCancellation(t *testing.T) { func TestTransactionRegistrationWithInvalidNodesFails(t *testing.T) { ctx := testhelper.Context(t) - txMgr := transactions.NewManager(config.Config{}) + txMgr := transactions.NewManager(config.Config{}, testhelper.NewLogger(t)) _, _, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{}, 1) require.Equal(t, transactions.ErrMissingNodes, err) @@ -303,7 +303,7 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) { ctx := testhelper.Context(t) - txMgr := transactions.NewManager(config.Config{}) + txMgr := transactions.NewManager(config.Config{}, testhelper.NewLogger(t)) for _, tc := range tc { t.Run(tc.desc, func(t *testing.T) { diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 2034fa687..f3fa0c94c 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -23,6 +23,7 @@ var ErrNotFound = errors.New("transaction not found") type Manager struct { idSequence uint64 lock sync.Mutex + logger log.Logger transactions map[uint64]*transaction counterMetric *prometheus.CounterVec delayMetric *prometheus.HistogramVec @@ -30,8 +31,9 @@ type Manager struct { } // NewManager creates a new transactions Manager. -func NewManager(cfg config.Config) *Manager { +func NewManager(cfg config.Config, logger log.Logger) *Manager { return &Manager{ + logger: logger.WithField("component", "transactions.Manager"), transactions: make(map[uint64]*transaction), counterMetric: prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -74,10 +76,6 @@ func (mgr *Manager) Collect(metrics chan<- prometheus.Metric) { mgr.subtransactionsMetric.Collect(metrics) } -func (mgr *Manager) log(ctx context.Context) log.Logger { - return log.FromContext(ctx).WithField("component", "transactions.Manager") -} - // CancelFunc is the transaction cancellation function returned by // `RegisterTransaction`. Calling it will cause the transaction to be removed // from the transaction manager. @@ -103,10 +101,10 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr } mgr.transactions[transactionID] = transaction - mgr.log(ctx).WithFields(log.Fields{ + mgr.logger.WithFields(log.Fields{ "transaction.id": transactionID, "transaction.voters": voters, - }).Debug("RegisterTransaction") + }).DebugContext(ctx, "RegisterTransaction") mgr.counterMetric.WithLabelValues("registered").Add(float64(len(voters))) @@ -136,11 +134,11 @@ func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *transact } } - mgr.log(ctx).WithFields(log.Fields{ + mgr.logger.WithFields(log.Fields{ "transaction.id": transaction.ID(), "transaction.committed": fmt.Sprintf("%d/%d", committed, len(state)), "transaction.subtransactions": transaction.CountSubtransactions(), - }).Info("transaction completed") + }).InfoContext(ctx, "transaction completed") return nil } @@ -170,14 +168,14 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n mgr.delayMetric.WithLabelValues("vote").Observe(delay.Seconds()) }() - logger := mgr.log(ctx).WithFields(log.Fields{ + logger := mgr.logger.WithFields(log.Fields{ "transaction.id": transactionID, "transaction.voter": node, "transaction.hash": vote.String(), }) mgr.counterMetric.WithLabelValues("started").Inc() - logger.Debug("VoteTransaction") + logger.DebugContext(ctx, "VoteTransaction") if err := mgr.voteTransaction(ctx, transactionID, node, vote); err != nil { var counterLabel string @@ -188,13 +186,13 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n // termination, so we should not log an error here. } else if errors.Is(err, ErrTransactionFailed) { counterLabel = "failed" - logger.WithError(err).Error("VoteTransaction: did not reach quorum") + logger.WithError(err).ErrorContext(ctx, "VoteTransaction: did not reach quorum") } else if errors.Is(err, ErrTransactionCanceled) { counterLabel = "canceled" - logger.WithError(err).Error("VoteTransaction: transaction was canceled") + logger.WithError(err).ErrorContext(ctx, "VoteTransaction: transaction was canceled") } else { counterLabel = "invalid" - logger.WithError(err).Error("VoteTransaction: failure") + logger.WithError(err).ErrorContext(ctx, "VoteTransaction: failure") } mgr.counterMetric.WithLabelValues(counterLabel).Inc() @@ -202,7 +200,7 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n return err } - logger.Info("VoteTransaction: transaction committed") + logger.InfoContext(ctx, "VoteTransaction: transaction committed") mgr.counterMetric.WithLabelValues("committed").Inc() return nil @@ -222,9 +220,9 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e return err } - mgr.log(ctx).WithFields(log.Fields{ + mgr.logger.WithFields(log.Fields{ "transaction.id": transactionID, - }).Debug("VoteTransaction: transaction stopped") + }).DebugContext(ctx, "VoteTransaction: transaction stopped") mgr.counterMetric.WithLabelValues("stopped").Inc() return nil diff --git a/internal/praefect/transactions/manager_test.go b/internal/praefect/transactions/manager_test.go index 07e45b37d..75a00b6a4 100644 --- a/internal/praefect/transactions/manager_test.go +++ b/internal/praefect/transactions/manager_test.go @@ -39,7 +39,7 @@ func TestManager_CancelTransactionNodeVoter(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - manager := NewManager(config.Config{}) + manager := NewManager(config.Config{}, testhelper.NewLogger(t)) var id uint64 if tc.register { diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index 2ed44a5ab..af486ba73 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -492,7 +492,7 @@ func TestVerifier(t *testing.T) { db := testdb.New(t) logger := testhelper.SharedLogger(t) sidechannelRegistry := sidechannel.NewRegistry() - txManager := transactions.NewManager(config.Config{}) + txManager := transactions.NewManager(config.Config{}, logger) nodeSet, err := DialNodes( ctx, conf.VirtualStorages, |