Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-25 13:20:29 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-10-05 13:21:30 +0300
commit45dc75f83e29051a0ccb292b42d91c32c6653f90 (patch)
treeec9adc3dc0d898c827c9069144f473d167c4bcef
parent132727054bf5ed52b82c11189efc3a3e3ae9a74e (diff)
praefect/transactions: Replace use of ctxlogrus with injected logger
The ctxlogrus package is going away with the replacement being log fields extracted from the context via `log.DebugContext()` et al. Refactor the code to stop using ctxlogrus by injecting a logger and using the new context-based logging methods.
-rw-r--r--internal/cli/praefect/serve.go2
-rw-r--r--internal/praefect/auth_test.go2
-rw-r--r--internal/praefect/coordinator_pg_test.go2
-rw-r--r--internal/praefect/coordinator_test.go50
-rw-r--r--internal/praefect/info_service_test.go2
-rw-r--r--internal/praefect/server_factory_test.go2
-rw-r--r--internal/praefect/server_test.go14
-rw-r--r--internal/praefect/testserver.go12
-rw-r--r--internal/praefect/transaction_test.go6
-rw-r--r--internal/praefect/transactions/manager.go32
-rw-r--r--internal/praefect/transactions/manager_test.go2
-rw-r--r--internal/praefect/verifier_test.go2
12 files changed, 67 insertions, 61 deletions
diff --git a/internal/cli/praefect/serve.go b/internal/cli/praefect/serve.go
index 59f56b7c5..74f155e3e 100644
--- a/internal/cli/praefect/serve.go
+++ b/internal/cli/praefect/serve.go
@@ -251,7 +251,7 @@ func server(
}
}
- transactionManager := transactions.NewManager(conf)
+ transactionManager := transactions.NewManager(conf, logger)
sidechannelRegistry := sidechannel.NewRegistry()
backchannelCfg := backchannel.DefaultConfiguration()
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index b01529082..ed9e2e741 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -155,7 +155,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
require.NoError(t, err)
defer nodeMgr.Stop()
- txMgr := transactions.NewManager(conf)
+ txMgr := transactions.NewManager(conf, logger)
coordinator := NewCoordinator(logger, queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, protoregistry.GitalyProtoPreregistered)
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 9cf2baa23..ae583e74f 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -191,7 +191,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
}
ctx := testhelper.Context(t)
- txMgr := transactions.NewManager(conf)
+ txMgr := transactions.NewManager(conf, logger)
tx := db.Begin(t)
defer tx.Rollback(t)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index a8c873fd4..07a658090 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -92,8 +92,10 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
},
}
+ logger := testhelper.NewLogger(t)
+
coordinator := NewCoordinator(
- testhelper.NewLogger(t),
+ logger,
datastore.NewPostgresReplicationEventQueue(db),
rs,
NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) {
@@ -104,7 +106,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
}},
}, nil
}}, rs),
- transactions.NewManager(conf),
+ transactions.NewManager(conf, logger),
conf,
protoregistry.GitalyProtoPreregistered,
)
@@ -163,9 +165,10 @@ func TestStreamDirectorMutator(t *testing.T) {
}
db := testdb.New(t)
- txMgr := transactions.NewManager(conf)
+ logger := testhelper.SharedLogger(t)
+ txMgr := transactions.NewManager(conf, logger)
- nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil, testhelper.SharedLogger(t))
+ nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil, logger)
require.NoError(t, err)
defer nodeSet.Close()
@@ -485,10 +488,11 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
},
}
- txMgr := transactions.NewManager(conf)
+ logger := testhelper.NewLogger(t)
+ txMgr := transactions.NewManager(conf, logger)
coordinator := NewCoordinator(
- testhelper.NewLogger(t),
+ logger,
datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
rs,
NewNodeManagerRouter(nodeMgr, rs),
@@ -598,10 +602,11 @@ func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) {
},
}
- txMgr := transactions.NewManager(conf)
+ logger := testhelper.NewLogger(t)
+ txMgr := transactions.NewManager(conf, logger)
coordinator := NewCoordinator(
- testhelper.NewLogger(t),
+ logger,
datastore.NewPostgresReplicationEventQueue(testdb.New(t)),
rs,
NewNodeManagerRouter(nodeMgr, rs),
@@ -704,12 +709,14 @@ func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) {
},
}
+ logger := testhelper.NewLogger(t)
+
coordinator := NewCoordinator(
- testhelper.NewLogger(t),
+ logger,
&datastore.MockReplicationEventQueue{},
rs,
router,
- transactions.NewManager(conf),
+ transactions.NewManager(conf, logger),
conf,
protoregistry.GitalyProtoPreregistered,
)
@@ -1066,15 +1073,15 @@ func TestStreamDirectorAccessor(t *testing.T) {
}
ctx := testhelper.Context(t)
- entry := testhelper.SharedLogger(t)
+ logger := testhelper.SharedLogger(t)
rs := datastore.MockRepositoryStore{}
- nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
defer nodeMgr.Stop()
- txMgr := transactions.NewManager(conf)
+ txMgr := transactions.NewManager(conf, logger)
for _, tc := range []struct {
desc string
@@ -1172,7 +1179,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
}
ctx := testhelper.Context(t)
- entry := testhelper.SharedLogger(t)
+ logger := testhelper.SharedLogger(t)
repoStore := datastore.MockRepositoryStore{
GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
@@ -1180,12 +1187,12 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
},
}
- nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
defer nodeMgr.Stop()
- txMgr := transactions.NewManager(conf)
+ txMgr := transactions.NewManager(conf, logger)
coordinator := NewCoordinator(
testhelper.NewLogger(t),
@@ -1565,11 +1572,12 @@ func TestStreamDirector_repo_creation(t *testing.T) {
conf.DefaultReplicationFactors(),
)
- txMgr := transactions.NewManager(conf)
+ logger := testhelper.NewLogger(t)
+ txMgr := transactions.NewManager(conf, logger)
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(tx))
coordinator := NewCoordinator(
- testhelper.NewLogger(t),
+ logger,
queueInterceptor,
repositoryStore,
router,
@@ -1720,14 +1728,14 @@ func TestAbsentCorrelationID(t *testing.T) {
}
ctx := testhelper.Context(t)
- entry := testhelper.SharedLogger(t)
+ logger := testhelper.SharedLogger(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- txMgr := transactions.NewManager(conf)
+ txMgr := transactions.NewManager(conf, logger)
rs := datastore.MockRepositoryStore{}
coordinator := NewCoordinator(
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index dc20c69e4..f07be3850 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -56,7 +56,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
db := testdb.New(t)
logger := testhelper.SharedLogger(t)
// the only thing used from the config is the grpc_latency_buckets which is not relevant for the test
- txManager := transactions.NewManager(config.Config{})
+ txManager := transactions.NewManager(config.Config{}, logger)
sidechannelRegistry := sidechannel.NewRegistry()
nodeSet, err := DialNodes(
ctx,
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 6a1bb7b75..262eadb8a 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -77,7 +77,7 @@ func TestServerFactory(t *testing.T) {
queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
rs := datastore.MockRepositoryStore{}
- txMgr := transactions.NewManager(conf)
+ txMgr := transactions.NewManager(conf, logger)
sidechannelRegistry := sidechannel.NewRegistry()
clientHandshaker := backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(logger, transaction.NewServer(txMgr), sidechannelRegistry), backchannel.DefaultConfiguration())
nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, sidechannelRegistry)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 4d2bbd8d7..4110c31e4 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -53,9 +53,8 @@ import (
)
func TestNewBackchannelServerFactory(t *testing.T) {
- mgr := transactions.NewManager(config.Config{})
-
logger := testhelper.SharedLogger(t)
+ mgr := transactions.NewManager(config.Config{}, logger)
registry := backchannel.NewRegistry()
lm := listenmux.New(insecure.NewCredentials())
@@ -533,7 +532,7 @@ func TestRemoveRepository(t *testing.T) {
logger := testhelper.SharedLogger(t)
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t)))
repoStore := defaultRepoStore(praefectCfg)
- txMgr := defaultTxMgr(praefectCfg)
+ txMgr := defaultTxMgr(praefectCfg, logger)
nodeMgr, err := nodes.NewManager(testhelper.SharedLogger(t), praefectCfg, nil,
repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered,
nil, backchannel.NewClientHandshaker(
@@ -604,8 +603,8 @@ func TestRenameRepository(t *testing.T) {
rs := datastore.NewPostgresRepositoryStore(db, nil)
- txManager := transactions.NewManager(praefectCfg)
logger := testhelper.SharedLogger(t)
+ txManager := transactions.NewManager(praefectCfg, logger)
clientHandshaker := backchannel.NewClientHandshaker(
logger,
NewBackchannelServerFactory(
@@ -777,7 +776,9 @@ func newSmartHTTPGrpcServer(t *testing.T, cfg gconfig.Cfg, smartHTTPService gita
func TestProxyWrites(t *testing.T) {
t.Parallel()
- txMgr := transactions.NewManager(config.Config{})
+
+ logger := testhelper.SharedLogger(t)
+ txMgr := transactions.NewManager(config.Config{}, logger)
smartHTTP0, smartHTTP1, smartHTTP2 := &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}, &mockSmartHTTP{txMgr: txMgr}
@@ -813,7 +814,6 @@ func TestProxyWrites(t *testing.T) {
}
queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
- logger := testhelper.SharedLogger(t)
nodeMgr, err := nodes.NewManager(logger, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
@@ -994,7 +994,7 @@ func TestErrorThreshold(t *testing.T) {
queue,
rs,
NewNodeManagerRouter(nodeMgr, rs),
- transactions.NewManager(conf),
+ transactions.NewManager(conf, logger),
conf,
protoregistry.GitalyProtoPreregistered,
)
diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go
index 4a8213d06..835b5377d 100644
--- a/internal/praefect/testserver.go
+++ b/internal/praefect/testserver.go
@@ -87,8 +87,8 @@ func defaultQueue(tb testing.TB) datastore.ReplicationEventQueue {
return datastore.NewPostgresReplicationEventQueue(testdb.New(tb))
}
-func defaultTxMgr(conf config.Config) *transactions.Manager {
- return transactions.NewManager(conf)
+func defaultTxMgr(conf config.Config, logger log.Logger) *transactions.Manager {
+ return transactions.NewManager(conf, logger)
}
func defaultNodeMgr(tb testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager {
@@ -190,6 +190,9 @@ func RunPraefectServer(
) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
var cleanups []testhelper.Cleanup
+ if opt.WithLogger == nil {
+ opt.WithLogger = testhelper.SharedLogger(tb)
+ }
if opt.WithQueue == nil {
opt.WithQueue = defaultQueue(tb)
}
@@ -197,7 +200,7 @@ func RunPraefectServer(
opt.WithRepoStore = defaultRepoStore(conf)
}
if opt.WithTxMgr == nil {
- opt.WithTxMgr = defaultTxMgr(conf)
+ opt.WithTxMgr = defaultTxMgr(conf, opt.WithLogger)
}
if opt.WithBackends != nil {
cleanups = append(cleanups, opt.WithBackends(conf.VirtualStorages)...)
@@ -205,9 +208,6 @@ func RunPraefectServer(
if opt.WithAnnotations == nil {
opt.WithAnnotations = protoregistry.GitalyProtoPreregistered
}
- if opt.WithLogger == nil {
- opt.WithLogger = testhelper.SharedLogger(tb)
- }
if opt.WithNodeMgr == nil {
opt.WithNodeMgr = defaultNodeMgr(tb, conf, opt.WithRepoStore)
}
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index a6216cb61..1e05d0522 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -29,7 +29,7 @@ type voter struct {
func runPraefectServerAndTxMgr(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
conf := testConfig(1)
- txMgr := transactions.NewManager(conf)
+ txMgr := transactions.NewManager(conf, testhelper.SharedLogger(tb))
cc, _, cleanup := RunPraefectServer(tb, ctx, conf, BuildOptions{
WithTxMgr: txMgr,
WithNodeMgr: nullNodeMgr{}, // to suppress node address issues
@@ -255,7 +255,7 @@ func TestTransactionWithContextCancellation(t *testing.T) {
func TestTransactionRegistrationWithInvalidNodesFails(t *testing.T) {
ctx := testhelper.Context(t)
- txMgr := transactions.NewManager(config.Config{})
+ txMgr := transactions.NewManager(config.Config{}, testhelper.NewLogger(t))
_, _, err := txMgr.RegisterTransaction(ctx, []transactions.Voter{}, 1)
require.Equal(t, transactions.ErrMissingNodes, err)
@@ -303,7 +303,7 @@ func TestTransactionRegistrationWithInvalidThresholdFails(t *testing.T) {
ctx := testhelper.Context(t)
- txMgr := transactions.NewManager(config.Config{})
+ txMgr := transactions.NewManager(config.Config{}, testhelper.NewLogger(t))
for _, tc := range tc {
t.Run(tc.desc, func(t *testing.T) {
diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go
index 2034fa687..f3fa0c94c 100644
--- a/internal/praefect/transactions/manager.go
+++ b/internal/praefect/transactions/manager.go
@@ -23,6 +23,7 @@ var ErrNotFound = errors.New("transaction not found")
type Manager struct {
idSequence uint64
lock sync.Mutex
+ logger log.Logger
transactions map[uint64]*transaction
counterMetric *prometheus.CounterVec
delayMetric *prometheus.HistogramVec
@@ -30,8 +31,9 @@ type Manager struct {
}
// NewManager creates a new transactions Manager.
-func NewManager(cfg config.Config) *Manager {
+func NewManager(cfg config.Config, logger log.Logger) *Manager {
return &Manager{
+ logger: logger.WithField("component", "transactions.Manager"),
transactions: make(map[uint64]*transaction),
counterMetric: prometheus.NewCounterVec(
prometheus.CounterOpts{
@@ -74,10 +76,6 @@ func (mgr *Manager) Collect(metrics chan<- prometheus.Metric) {
mgr.subtransactionsMetric.Collect(metrics)
}
-func (mgr *Manager) log(ctx context.Context) log.Logger {
- return log.FromContext(ctx).WithField("component", "transactions.Manager")
-}
-
// CancelFunc is the transaction cancellation function returned by
// `RegisterTransaction`. Calling it will cause the transaction to be removed
// from the transaction manager.
@@ -103,10 +101,10 @@ func (mgr *Manager) RegisterTransaction(ctx context.Context, voters []Voter, thr
}
mgr.transactions[transactionID] = transaction
- mgr.log(ctx).WithFields(log.Fields{
+ mgr.logger.WithFields(log.Fields{
"transaction.id": transactionID,
"transaction.voters": voters,
- }).Debug("RegisterTransaction")
+ }).DebugContext(ctx, "RegisterTransaction")
mgr.counterMetric.WithLabelValues("registered").Add(float64(len(voters)))
@@ -136,11 +134,11 @@ func (mgr *Manager) cancelTransaction(ctx context.Context, transaction *transact
}
}
- mgr.log(ctx).WithFields(log.Fields{
+ mgr.logger.WithFields(log.Fields{
"transaction.id": transaction.ID(),
"transaction.committed": fmt.Sprintf("%d/%d", committed, len(state)),
"transaction.subtransactions": transaction.CountSubtransactions(),
- }).Info("transaction completed")
+ }).InfoContext(ctx, "transaction completed")
return nil
}
@@ -170,14 +168,14 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
mgr.delayMetric.WithLabelValues("vote").Observe(delay.Seconds())
}()
- logger := mgr.log(ctx).WithFields(log.Fields{
+ logger := mgr.logger.WithFields(log.Fields{
"transaction.id": transactionID,
"transaction.voter": node,
"transaction.hash": vote.String(),
})
mgr.counterMetric.WithLabelValues("started").Inc()
- logger.Debug("VoteTransaction")
+ logger.DebugContext(ctx, "VoteTransaction")
if err := mgr.voteTransaction(ctx, transactionID, node, vote); err != nil {
var counterLabel string
@@ -188,13 +186,13 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
// termination, so we should not log an error here.
} else if errors.Is(err, ErrTransactionFailed) {
counterLabel = "failed"
- logger.WithError(err).Error("VoteTransaction: did not reach quorum")
+ logger.WithError(err).ErrorContext(ctx, "VoteTransaction: did not reach quorum")
} else if errors.Is(err, ErrTransactionCanceled) {
counterLabel = "canceled"
- logger.WithError(err).Error("VoteTransaction: transaction was canceled")
+ logger.WithError(err).ErrorContext(ctx, "VoteTransaction: transaction was canceled")
} else {
counterLabel = "invalid"
- logger.WithError(err).Error("VoteTransaction: failure")
+ logger.WithError(err).ErrorContext(ctx, "VoteTransaction: failure")
}
mgr.counterMetric.WithLabelValues(counterLabel).Inc()
@@ -202,7 +200,7 @@ func (mgr *Manager) VoteTransaction(ctx context.Context, transactionID uint64, n
return err
}
- logger.Info("VoteTransaction: transaction committed")
+ logger.InfoContext(ctx, "VoteTransaction: transaction committed")
mgr.counterMetric.WithLabelValues("committed").Inc()
return nil
@@ -222,9 +220,9 @@ func (mgr *Manager) StopTransaction(ctx context.Context, transactionID uint64) e
return err
}
- mgr.log(ctx).WithFields(log.Fields{
+ mgr.logger.WithFields(log.Fields{
"transaction.id": transactionID,
- }).Debug("VoteTransaction: transaction stopped")
+ }).DebugContext(ctx, "VoteTransaction: transaction stopped")
mgr.counterMetric.WithLabelValues("stopped").Inc()
return nil
diff --git a/internal/praefect/transactions/manager_test.go b/internal/praefect/transactions/manager_test.go
index 07e45b37d..75a00b6a4 100644
--- a/internal/praefect/transactions/manager_test.go
+++ b/internal/praefect/transactions/manager_test.go
@@ -39,7 +39,7 @@ func TestManager_CancelTransactionNodeVoter(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- manager := NewManager(config.Config{})
+ manager := NewManager(config.Config{}, testhelper.NewLogger(t))
var id uint64
if tc.register {
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
index 2ed44a5ab..af486ba73 100644
--- a/internal/praefect/verifier_test.go
+++ b/internal/praefect/verifier_test.go
@@ -492,7 +492,7 @@ func TestVerifier(t *testing.T) {
db := testdb.New(t)
logger := testhelper.SharedLogger(t)
sidechannelRegistry := sidechannel.NewRegistry()
- txManager := transactions.NewManager(config.Config{})
+ txManager := transactions.NewManager(config.Config{}, logger)
nodeSet, err := DialNodes(
ctx,
conf.VirtualStorages,