Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2023-09-28 10:26:32 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2023-10-04 11:31:25 +0300
commit1fbdff1e93a3b7234e0f04b3101c7a5358c9c804 (patch)
tree3a6f4ae75bb885dbdf897df1032c3cf3b293dabe
parent5763976d9d8ec1559fc17b1b236c82089c369c4c (diff)
praefect/replicator: Disentangle use of the correlation ID key
In order to retain correlation IDs of replication jobs between the time they are added to the database and the time when they are processed, we store the correlation ID as part of the database entry. We store this field generically in the `params` section, which is backend by a JSON encoded column, where we use key-value pairs. The way we use the key here is a bit of a mess though, as we reuse the same key that our metadatahandler uses to inject the correlation ID for logging purposes. It goes without saying though that it is not a good idea to conflate a string for representation (logging) and for storing data consistently (the replication parameters). While the key is in fact the same right now, it could totally be that it might change for our logging infrastructure at some point or even go away. Separate the concerns by introducing a new constant in the `datastore` package that holds the key we want to use in the database. When logging data, we now consistently use the key from `correlation.FieldName`, which is shared across GitLab. And for the database, we consistently use the newly introduced constant.
-rw-r--r--internal/cli/praefect/subcmd_track_repository.go3
-rw-r--r--internal/grpc/middleware/metadatahandler/metadatahandler.go5
-rw-r--r--internal/praefect/coordinator.go3
-rw-r--r--internal/praefect/coordinator_test.go9
-rw-r--r--internal/praefect/datastore/queue.go4
-rw-r--r--internal/praefect/reconciler/reconciler_test.go3
-rw-r--r--internal/praefect/replicator.go12
-rw-r--r--internal/praefect/replicator_test.go9
8 files changed, 21 insertions, 27 deletions
diff --git a/internal/cli/praefect/subcmd_track_repository.go b/internal/cli/praefect/subcmd_track_repository.go
index 207d89542..4b5dd1026 100644
--- a/internal/cli/praefect/subcmd_track_repository.go
+++ b/internal/cli/praefect/subcmd_track_repository.go
@@ -11,7 +11,6 @@ import (
"github.com/urfave/cli/v2"
glcli "gitlab.com/gitlab-org/gitaly/v16/internal/cli"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect"
@@ -244,7 +243,7 @@ func (req *trackRepositoryRequest) execRequest(ctx context.Context,
SourceNodeStorage: primary,
TargetNodeStorage: secondary,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
+ Meta: datastore.Params{datastore.CorrelationIDKey: correlationID},
}
if replicateImmediately {
conn, ok := connections[secondary]
diff --git a/internal/grpc/middleware/metadatahandler/metadatahandler.go b/internal/grpc/middleware/metadatahandler/metadatahandler.go
index cbaadc56b..930276999 100644
--- a/internal/grpc/middleware/metadatahandler/metadatahandler.go
+++ b/internal/grpc/middleware/metadatahandler/metadatahandler.go
@@ -74,9 +74,6 @@ const UserIDKey = "user_id"
// UsernameKey is the key used in ctx_tags to store the username
const UsernameKey = "username"
-// CorrelationIDKey is the key used in ctx_tags to store the correlation ID
-const CorrelationIDKey = "correlation_id"
-
// Unknown client and feature. Matches the prometheus grpc unknown value
const unknownValue = "unknown"
@@ -193,7 +190,7 @@ func addMetadataTags(ctx context.Context, fullMethod, grpcMethodType string) met
// This is a stop-gap approach to logging correlation_ids
correlationID := correlation.ExtractFromContext(ctx)
if correlationID != "" {
- tags.Set(CorrelationIDKey, correlationID)
+ tags.Set(correlation.FieldName, correlationID)
}
return metaTags
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 76d0b15f2..ac1d554f3 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -11,7 +11,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
@@ -1144,7 +1143,7 @@ func (c *Coordinator) newRequestFinalizer(
TargetNodeStorage: secondary,
Params: params,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
+ Meta: datastore.Params{datastore.CorrelationIDKey: correlationID},
}
g.Go(func() error {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index e0eafe0c5..a8c873fd4 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -25,7 +25,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
gitaly_metadata "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
@@ -224,7 +223,7 @@ func TestStreamDirectorMutator(t *testing.T) {
TargetNodeStorage: secondaryNode.Storage,
SourceNodeStorage: primaryNode.Storage,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"},
},
}
},
@@ -276,7 +275,7 @@ func TestStreamDirectorMutator(t *testing.T) {
TargetNodeStorage: secondaryNode.Storage,
SourceNodeStorage: primaryNode.Storage,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"},
},
}
},
@@ -1646,7 +1645,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
TargetNodeStorage: target,
SourceNodeStorage: primaryNode.Storage,
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "my-correlation-id"},
})
}
@@ -1763,7 +1762,7 @@ func TestAbsentCorrelationID(t *testing.T) {
require.NoError(t, err)
require.Len(t, jobs, 1)
- require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey],
+ require.NotZero(t, jobs[0].Meta[datastore.CorrelationIDKey],
"the coordinator should have generated a random ID")
}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 3dccd2da8..9865ea744 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -12,6 +12,10 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore/glsql"
)
+// CorrelationIDKey is the key that is used to store the correlation ID for a specific replication job as part of the
+// parameters.
+const CorrelationIDKey = "correlation_id"
+
// ReplicationEventExistsError is returned when trying to add an already existing
// replication event into the queue.
type ReplicationEventExistsError struct {
diff --git a/internal/praefect/reconciler/reconciler_test.go b/internal/praefect/reconciler/reconciler_test.go
index a7debac53..0a30858cb 100644
--- a/internal/praefect/reconciler/reconciler_test.go
+++ b/internal/praefect/reconciler/reconciler_test.go
@@ -10,7 +10,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
@@ -1140,7 +1139,7 @@ func TestReconciler(t *testing.T) {
var job datastore.ReplicationJob
var meta datastore.Params
require.NoError(t, rows.Scan(&job, &meta))
- require.NotEmpty(t, meta[metadatahandler.CorrelationIDKey])
+ require.NotEmpty(t, meta[datastore.CorrelationIDKey])
actualJobs = append(actualJobs, job)
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 2d283749e..42b3eebee 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -10,7 +10,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
@@ -52,7 +51,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
logWithVirtualStorage: event.Job.VirtualStorage,
logWithReplTarget: event.Job.TargetNodeStorage,
"replication_job_source": event.Job.SourceNodeStorage,
- logWithCorrID: correlation.ExtractFromContext(ctx),
+ correlation.FieldName: correlation.ExtractFromContext(ctx),
})
generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage)
@@ -148,7 +147,7 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
return err
}
- dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)).
+ dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)).
WithError(err).
Info("deleted repository did not have a store entry")
}
@@ -190,7 +189,7 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat
return err
}
- dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)).
+ dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)).
WithError(err).
Info("replicated repository rename does not have a store entry")
}
@@ -299,7 +298,6 @@ func (r ReplMgr) Collect(ch chan<- prometheus.Metric) {
const (
logWithReplTarget = "replication_job_target"
- logWithCorrID = "correlation_id"
logWithVirtualStorage = "virtual_storage"
)
@@ -335,7 +333,7 @@ type (
func getCorrelationID(params datastore.Params) string {
correlationID := ""
- if val, found := params[metadatahandler.CorrelationIDKey]; found {
+ if val, found := params[datastore.CorrelationIDKey]; found {
correlationID, _ = val.(string)
}
return correlationID
@@ -538,7 +536,7 @@ func (r ReplMgr) handleNodeEvent(ctx context.Context, logger log.Logger, targetC
ctx = correlation.ContextWithCorrelation(ctx, cid)
// we want it to be queryable by common `json.correlation_id` filter
- logger = logger.WithField(logWithCorrID, cid)
+ logger = logger.WithField(correlation.FieldName, cid)
// we log all details about the event only once before start of the processing
logger.WithField("event", event).Info("replication job processing started")
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 02b2458c1..9b1e0651e 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -18,7 +18,6 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup"
- "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -119,7 +118,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
},
State: datastore.JobStateReady,
Attempt: 3,
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"},
+ Meta: datastore.Params{datastore.CorrelationIDKey: "correlation-id"},
})
}
require.Len(t, events, 1)
@@ -174,7 +173,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
require.Equal(t,
[]interface{}{"replication job processing started", "virtual", "correlation-id"},
- []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]},
+ []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[correlation.FieldName]},
)
dequeuedEvent := logEntries[2].Data["event"].(datastore.ReplicationEvent)
@@ -183,7 +182,7 @@ func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) {
require.Equal(t,
[]interface{}{"replication job processing finished", "virtual", datastore.JobStateCompleted, "correlation-id"},
- []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]},
+ []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[correlation.FieldName]},
)
replicatedPath := filepath.Join(backupCfg.Storages[0].Path, testRepoProto.GetRelativePath())
@@ -254,7 +253,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
lastEntry := entries[0]
require.Equal(t, logrus.InfoLevel, lastEntry.Level)
require.Equal(t, returnedErr, lastEntry.Data["error"])
- require.Equal(t, "correlation-id", lastEntry.Data[logWithCorrID])
+ require.Equal(t, "correlation-id", lastEntry.Data[correlation.FieldName])
require.Equal(t, tc.expectedMessage, lastEntry.Message)
})
}