diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-09-28 10:26:32 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2023-10-04 11:31:25 +0300 |
commit | 1fbdff1e93a3b7234e0f04b3101c7a5358c9c804 (patch) | |
tree | 3a6f4ae75bb885dbdf897df1032c3cf3b293dabe | |
parent | 5763976d9d8ec1559fc17b1b236c82089c369c4c (diff) |
praefect/replicator: Disentangle use of the correlation ID key
In order to retain correlation IDs of replication jobs between the time
they are added to the database and the time when they are processed, we
store the correlation ID as part of the database entry. We store this
field generically in the `params` section, which is backend by a JSON
encoded column, where we use key-value pairs.
The way we use the key here is a bit of a mess though, as we reuse the
same key that our metadatahandler uses to inject the correlation ID for
logging purposes. It goes without saying though that it is not a good
idea to conflate a string for representation (logging) and for storing
data consistently (the replication parameters). While the key is in fact
the same right now, it could totally be that it might change for our
logging infrastructure at some point or even go away.
Separate the concerns by introducing a new constant in the `datastore`
package that holds the key we want to use in the database. When logging
data, we now consistently use the key from `correlation.FieldName`,
which is shared across GitLab. And for the database, we consistently use
the newly introduced constant.
-rw-r--r-- | internal/cli/praefect/subcmd_track_repository.go | 3 | ||||
-rw-r--r-- | internal/grpc/middleware/metadatahandler/metadatahandler.go | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 3 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 4 | ||||
-rw-r--r-- | internal/praefect/reconciler/reconciler_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 12 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 9 |
8 files changed, 21 insertions, 27 deletions
diff --git a/internal/cli/praefect/subcmd_track_repository.go b/internal/cli/praefect/subcmd_track_repository.go index 207d89542..4b5dd1026 100644 --- a/internal/cli/praefect/subcmd_track_repository.go +++ b/internal/cli/praefect/subcmd_track_repository.go @@ -11,7 +11,6 @@ import ( "github.com/urfave/cli/v2" glcli "gitlab.com/gitlab-org/gitaly/v16/internal/cli" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect" @@ -244,7 +243,7 @@ func (req *trackRepositoryRequest) execRequest(ctx context.Context, SourceNodeStorage: primary, TargetNodeStorage: secondary, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + Meta: datastore.Params{datastore.CorrelationIDKey: correlationID}, } if replicateImmediately { conn, ok := connections[secondary] diff --git a/internal/grpc/middleware/metadatahandler/metadatahandler.go b/internal/grpc/middleware/metadatahandler/metadatahandler.go index cbaadc56b..930276999 100644 --- a/internal/grpc/middleware/metadatahandler/metadatahandler.go +++ b/internal/grpc/middleware/metadatahandler/metadatahandler.go @@ -74,9 +74,6 @@ const UserIDKey = "user_id" // UsernameKey is the key used in ctx_tags to store the username const UsernameKey = "username" -// CorrelationIDKey is the key used in ctx_tags to store the correlation ID -const CorrelationIDKey = "correlation_id" - // Unknown client and feature. Matches the prometheus grpc unknown value const unknownValue = "unknown" @@ -193,7 +190,7 @@ func addMetadataTags(ctx context.Context, fullMethod, grpcMethodType string) met // This is a stop-gap approach to logging correlation_ids correlationID := correlation.ExtractFromContext(ctx) if correlationID != "" { - tags.Set(CorrelationIDKey, correlationID) + tags.Set(correlation.FieldName, correlationID) } return metaTags diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 76d0b15f2..ac1d554f3 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -11,7 +11,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" @@ -1144,7 +1143,7 @@ func (c *Coordinator) newRequestFinalizer( TargetNodeStorage: secondary, Params: params, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, + Meta: datastore.Params{datastore.CorrelationIDKey: correlationID}, } g.Go(func() error { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index e0eafe0c5..a8c873fd4 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -25,7 +25,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" gitaly_metadata "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" @@ -224,7 +223,7 @@ func TestStreamDirectorMutator(t *testing.T) { TargetNodeStorage: secondaryNode.Storage, SourceNodeStorage: primaryNode.Storage, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"}, }, } }, @@ -276,7 +275,7 @@ func TestStreamDirectorMutator(t *testing.T) { TargetNodeStorage: secondaryNode.Storage, SourceNodeStorage: primaryNode.Storage, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"}, }, } }, @@ -1646,7 +1645,7 @@ func TestStreamDirector_repo_creation(t *testing.T) { TargetNodeStorage: target, SourceNodeStorage: primaryNode.Storage, }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"}, }) } @@ -1763,7 +1762,7 @@ func TestAbsentCorrelationID(t *testing.T) { require.NoError(t, err) require.Len(t, jobs, 1) - require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey], + require.NotZero(t, jobs[0].Meta[datastore.CorrelationIDKey], "the coordinator should have generated a random ID") } diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 3dccd2da8..9865ea744 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -12,6 +12,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql" ) +// CorrelationIDKey is the key that is used to store the correlation ID for a specific replication job as part of the +// parameters. +const CorrelationIDKey = "correlation_id" + // ReplicationEventExistsError is returned when trying to add an already existing // replication event into the queue. type ReplicationEventExistsError struct { diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go index a7debac53..0a30858cb 100644 --- a/internal/praefect/reconciler/reconciler_test.go +++ b/internal/praefect/reconciler/reconciler_test.go @@ -10,7 +10,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" @@ -1140,7 +1139,7 @@ func TestReconciler(t *testing.T) { var job datastore.ReplicationJob var meta datastore.Params require.NoError(t, rows.Scan(&job, &meta)) - require.NotEmpty(t, meta[metadatahandler.CorrelationIDKey]) + require.NotEmpty(t, meta[datastore.CorrelationIDKey]) actualJobs = append(actualJobs, job) } diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 2d283749e..42b3eebee 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -10,7 +10,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" @@ -52,7 +51,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli logWithVirtualStorage: event.Job.VirtualStorage, logWithReplTarget: event.Job.TargetNodeStorage, "replication_job_source": event.Job.SourceNodeStorage, - logWithCorrID: correlation.ExtractFromContext(ctx), + correlation.FieldName: correlation.ExtractFromContext(ctx), }) generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage) @@ -148,7 +147,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica return err } - dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)). + dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)). WithError(err). Info("deleted repository did not have a store entry") } @@ -190,7 +189,7 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat return err } - dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)). + dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)). WithError(err). Info("replicated repository rename does not have a store entry") } @@ -299,7 +298,6 @@ func (r ReplMgr) Collect(ch chan<- prometheus.Metric) { const ( logWithReplTarget = "replication_job_target" - logWithCorrID = "correlation_id" logWithVirtualStorage = "virtual_storage" ) @@ -335,7 +333,7 @@ type ( func getCorrelationID(params datastore.Params) string { correlationID := "" - if val, found := params[metadatahandler.CorrelationIDKey]; found { + if val, found := params[datastore.CorrelationIDKey]; found { correlationID, _ = val.(string) } return correlationID @@ -538,7 +536,7 @@ func (r ReplMgr) handleNodeEvent(ctx context.Context, logger log.Logger, targetC ctx = correlation.ContextWithCorrelation(ctx, cid) // we want it to be queryable by common `json.correlation_id` filter - logger = logger.WithField(logWithCorrID, cid) + logger = logger.WithField(correlation.FieldName, cid) // we log all details about the event only once before start of the processing logger.WithField("event", event).Info("replication job processing started") diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 02b2458c1..9b1e0651e 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -18,7 +18,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" - "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -119,7 +118,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { }, State: datastore.JobStateReady, Attempt: 3, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"}, + Meta: datastore.Params{datastore.CorrelationIDKey: "correlation-id"}, }) } require.Len(t, events, 1) @@ -174,7 +173,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { require.Equal(t, []interface{}{"replication job processing started", "virtual", "correlation-id"}, - []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]}, + []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[correlation.FieldName]}, ) dequeuedEvent := logEntries[2].Data["event"].(datastore.ReplicationEvent) @@ -183,7 +182,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { require.Equal(t, []interface{}{"replication job processing finished", "virtual", datastore.JobStateCompleted, "correlation-id"}, - []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]}, + []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[correlation.FieldName]}, ) replicatedPath := filepath.Join(backupCfg.Storages[0].Path, testRepoProto.GetRelativePath()) @@ -254,7 +253,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { lastEntry := entries[0] require.Equal(t, logrus.InfoLevel, lastEntry.Level) require.Equal(t, returnedErr, lastEntry.Data["error"]) - require.Equal(t, "correlation-id", lastEntry.Data[logWithCorrID]) + require.Equal(t, "correlation-id", lastEntry.Data[correlation.FieldName]) require.Equal(t, tc.expectedMessage, lastEntry.Message) }) } |