Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-09-15 10:28:15 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-09-15 10:28:15 +0300
commit13cb309f2ac87a82647fcecac3d08d3808cd9ff3 (patch)
tree32ee7796c38f0335e8fa0fdacf40ed202db29bec /internal/praefect
parent4805333598f9ed608652cfd28650cabf110fc646 (diff)
parent79c221c0c537b2bced0911b71130d027dd209764 (diff)
Merge branch 'ps-replication-logging' into 'master'
Fix logging of replication events processing. See merge request gitlab-org/gitaly!2547
Diffstat (limited to 'internal/praefect')
-rw-r--r--internal/praefect/replicator.go113
-rw-r--r--internal/praefect/replicator_test.go71
2 files changed, 115 insertions, 69 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 440ee0ec9..ab56c5d07 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -16,7 +16,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
+ "gitlab.com/gitlab-org/labkit/correlation"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
@@ -38,8 +38,8 @@ type Replicator interface {
}
type defaultReplicator struct {
- log *logrus.Entry
rs datastore.RepositoryStore
+ log logrus.FieldLogger
}
func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
@@ -53,6 +53,12 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
RelativePath: event.Job.RelativePath,
}
+ logger := dr.log.WithFields(logrus.Fields{
+ logWithVirtualStorage: event.Job.VirtualStorage,
+ logWithReplTarget: event.Job.TargetNodeStorage,
+ logWithCorrID: correlation.ExtractFromContext(ctx),
+ })
+
generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage)
if err != nil {
// Later generation might have already been replicated by an earlier replication job. If that's the case,
@@ -64,7 +70,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
message = "target repository already on the same generation, skipping replication job"
}
- dr.log.WithError(downgradeErr).Info(message)
+ logger.WithError(downgradeErr).Info(message)
return nil
}
@@ -77,7 +83,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
Source: sourceRepository,
Repository: targetRepository,
}); err != nil {
- return fmt.Errorf("failed to create repository: %v", err)
+ return fmt.Errorf("failed to create repository: %w", err)
}
// check if the repository has an object pool
@@ -104,7 +110,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
}
}
- checksumsMatch, err := dr.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(sourceCC), targetRepositoryClient, sourceRepository, targetRepository)
+ checksumsMatch, err := dr.confirmChecksums(ctx, logger, gitalypb.NewRepositoryServiceClient(sourceCC), targetRepositoryClient, sourceRepository, targetRepository)
if err != nil {
return err
}
@@ -115,10 +121,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli
targetRepository.GetStorageName(),
sourceRepository.GetStorageName(),
).Inc()
- dr.log.WithFields(logrus.Fields{
- "primary": sourceRepository,
- "replica": targetRepository,
- }).Error("checksums do not match")
+
+ logger.Error("checksums do not match")
}
if generation != datastore.GenerationUnknown {
@@ -154,7 +158,9 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica
return err
}
- dr.log.WithError(err).Info("replicated repository delete does not have a store entry")
+ dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)).
+ WithError(err).
+ Info("replicated repository delete does not have a store entry")
}
return nil
@@ -194,7 +200,9 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat
return err
}
- dr.log.WithError(err).Info("replicated repository rename does not have a store entry")
+ dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)).
+ WithError(err).
+ Info("replicated repository rename does not have a store entry")
}
return nil
@@ -278,7 +286,7 @@ func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClien
}
}
-func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, replicaClient gitalypb.RepositoryServiceClient, primary, replica *gitalypb.Repository) (bool, error) {
+func (dr defaultReplicator) confirmChecksums(ctx context.Context, logger logrus.FieldLogger, primaryClient, replicaClient gitalypb.RepositoryServiceClient, primary, replica *gitalypb.Repository) (bool, error) {
g, gCtx := errgroup.WithContext(ctx)
var primaryChecksum, replicaChecksum string
@@ -290,12 +298,10 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
return false, err
}
- dr.log.WithFields(logrus.Fields{
- "primary": primary,
- "replica": replica,
+ logger.WithFields(logrus.Fields{
"primary_checksum": primaryChecksum,
"replica_checksum": replicaChecksum,
- }).Info("replication finished")
+ }).Info("checksum comparison completed")
return primaryChecksum == replicaChecksum, nil
}
@@ -347,7 +353,7 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep
log: log.WithField("component", "replication_manager"),
queue: queue,
allowlist: map[string]struct{}{},
- replicator: defaultReplicator{log, rs},
+ replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")},
virtualStorages: virtualStorages,
nodeManager: nodeMgr,
replInFlightMetric: prometheus.NewGaugeVec(
@@ -393,9 +399,9 @@ func WithReplicator(r Replicator) ReplMgrOpt {
}
const (
- logWithReplJobID = "replication_job_id"
- logWithReplTarget = "replication_job_target"
- logWithCorrID = "correlation_id"
+ logWithReplTarget = "replication_job_target"
+ logWithCorrID = "correlation_id"
+ logWithVirtualStorage = "virtual_storage"
)
type backoff func() time.Duration
@@ -466,7 +472,7 @@ func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time.
}
func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStorage string) {
- logger := r.log.WithField("virtual_storage", virtualStorage)
+ logger := r.log.WithField(logWithVirtualStorage, virtualStorage)
backoff, reset := b()
logger.Info("processing started")
@@ -489,7 +495,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
if !target.IsHealthy() {
continue
}
- totalEvents += r.handleNode(ctx, logger, shard, virtualStorage, target)
+ totalEvents += r.handleNode(ctx, shard, virtualStorage, target)
}
}
@@ -507,10 +513,12 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
-func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int {
+func (r ReplMgr) handleNode(ctx context.Context, shard nodes.Shard, virtualStorage string, target nodes.Node) int {
+ logger := r.log.WithFields(logrus.Fields{logWithVirtualStorage: virtualStorage, logWithReplTarget: target.GetStorage()})
+
events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), int(r.dequeueBatchSize))
if err != nil {
- logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ logger.WithError(err).Error("failed to dequeue replication events")
return 0
}
@@ -530,13 +538,17 @@ func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shar
for state, eventIDs := range eventIDsByState {
ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
if err != nil {
- logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
+ logger.WithFields(logrus.Fields{"state": state, "event_ids": eventIDs}).
+ WithError(err).
+ Error("failed to acknowledge replication events")
continue
}
notAckIDs := subtractUint64(ackIDs, eventIDs)
if len(notAckIDs) > 0 {
- logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
+ logger.WithFields(logrus.Fields{"state": state, "event_ids": notAckIDs}).
+ WithError(err).
+ Error("replication events were not acknowledged")
}
}
@@ -563,26 +575,27 @@ func (r ReplMgr) startHealthUpdate(ctx context.Context, logger logrus.FieldLogge
}
func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, target nodes.Node, event datastore.ReplicationEvent) datastore.JobState {
- ctxLogger := logger.WithFields(logrus.Fields{
- logWithReplJobID: event.ID,
- logWithCorrID: getCorrelationID(event.Meta),
- })
- ctxLogger.Info("processing replication job")
+ cid := getCorrelationID(event.Meta)
+ ctx = correlation.ContextWithCorrelation(ctx, cid)
- if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil {
- ctxLogger.WithError(err).Error("replication job failed")
+ // we want it to be queryable by common `json.correlation_id` filter
+ logger = logger.WithField(logWithCorrID, cid)
+ // we log all details about the event only once before start of the processing
+ logger.WithField("event", event).Info("replication job processing started")
+ if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil {
+ newState := datastore.JobStateFailed
if event.Attempt <= 0 {
- logger.WithField("event", event).Info("handled event would be deleted")
- return datastore.JobStateDead
+ newState = datastore.JobStateDead
}
- return datastore.JobStateFailed
+ logger.WithError(err).WithField("new_state", newState).Error("replication job processing finished")
+ return newState
}
- logger.WithField("event", event).Info("handled event would be deleted")
-
- return datastore.JobStateCompleted
+ newState := datastore.JobStateCompleted
+ logger.WithField("new_state", newState).Info("replication job processing finished")
+ return newState
}
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
@@ -591,23 +604,19 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
return fmt.Errorf("get source node: %w", err)
}
- cid := getCorrelationID(event.Meta)
-
- var replCtx context.Context
var cancel func()
if r.replJobTimeout > 0 {
- replCtx, cancel = context.WithTimeout(ctx, r.replJobTimeout)
+ ctx, cancel = context.WithTimeout(ctx, r.replJobTimeout)
} else {
- replCtx, cancel = context.WithCancel(ctx)
+ ctx, cancel = context.WithCancel(ctx)
}
defer cancel()
- injectedCtx, err := helper.InjectGitalyServers(replCtx, event.Job.SourceNodeStorage, source.GetAddress(), source.GetToken())
+ ctx, err = helper.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.GetAddress(), source.GetToken())
if err != nil {
return fmt.Errorf("inject Gitaly servers into context: %w", err)
}
- injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, cid)
replStart := time.Now()
@@ -619,17 +628,17 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re
switch event.Job.Change {
case datastore.UpdateRepo:
- err = r.replicator.Replicate(injectedCtx, event, source.GetConnection(), targetCC)
+ err = r.replicator.Replicate(ctx, event, source.GetConnection(), targetCC)
case datastore.DeleteRepo:
- err = r.replicator.Destroy(injectedCtx, event, targetCC)
+ err = r.replicator.Destroy(ctx, event, targetCC)
case datastore.RenameRepo:
- err = r.replicator.Rename(injectedCtx, event, targetCC)
+ err = r.replicator.Rename(ctx, event, targetCC)
case datastore.GarbageCollect:
- err = r.replicator.GarbageCollect(injectedCtx, event, targetCC)
+ err = r.replicator.GarbageCollect(ctx, event, targetCC)
case datastore.RepackFull:
- err = r.replicator.RepackFull(injectedCtx, event, targetCC)
+ err = r.replicator.RepackFull(ctx, event, targetCC)
case datastore.RepackIncremental:
- err = r.replicator.RepackIncremental(injectedCtx, event, targetCC)
+ err = r.replicator.RepackIncremental(ctx, event, targetCC)
default:
err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change)
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index cb98654f1..3a943d5b8 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -39,6 +39,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
@@ -46,7 +47,7 @@ import (
"google.golang.org/grpc/reflection"
)
-func TestProcessReplicationJob(t *testing.T) {
+func TestReplMgr_ProcessBacklog(t *testing.T) {
backupStorageName := "backup"
backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName)
@@ -92,8 +93,6 @@ func TestProcessReplicationJob(t *testing.T) {
},
}
- queue := datastore.NewMemoryReplicationEventQueue(conf)
-
// create object pool on the source
objectPoolPath := testhelper.NewTestObjectPoolName(t)
pool, err := objectpool.NewObjectPool(gitaly_config.NewLocator(gitaly_config.Config), testRepo.GetStorageName(), objectPoolPath)
@@ -154,27 +153,63 @@ func TestProcessReplicationJob(t *testing.T) {
Message: "a commit",
})
- replicator := defaultReplicator{
- log: entry,
- rs: datastore.NewMemoryRepositoryStore(conf.StorageNames()),
- }
-
var mockReplicationLatencyHistogramVec promtest.MockHistogramVec
var mockReplicationDelayHistogramVec promtest.MockHistogramVec
+ logger := testhelper.DiscardTestLogger(t)
+ hook := test.NewLocal(logger)
+
+ queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
+ queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
+ cancel() // when it is called we know that replication is finished
+ return queue.Acknowledge(ctx, state, ids)
+ })
+
+ loggerEntry := logger.WithField("test", t.Name())
+ _, err = queue.Enqueue(ctx, events[0])
+ require.NoError(t, err)
+
+ store := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+
replMgr := NewReplMgr(
- testhelper.DiscardTestEntry(t),
+ loggerEntry,
conf.VirtualStorageNames(),
queue,
- datastore.NewMemoryRepositoryStore(conf.StorageNames()),
+ store,
nodeMgr,
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
WithDelayMetric(&mockReplicationDelayHistogramVec),
)
- replMgr.replicator = replicator
+ replMgr.ProcessBacklog(ctx, ExpBackoffFunc(time.Hour, 0))
+
+ <-ctx.Done()
+
+ logEntries := hook.AllEntries()
+ require.True(t, len(logEntries) > 3, "expected at least 4 log entries to be present")
+ require.Equal(t,
+ []interface{}{"processing started", "default"},
+ []interface{}{logEntries[0].Message, logEntries[0].Data["virtual_storage"]},
+ )
+
+ require.Equal(t,
+ []interface{}{"replication job processing started", "default", "correlation-id"},
+ []interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"], logEntries[1].Data[logWithCorrID]},
+ )
+
+ dequeuedEvent := logEntries[1].Data["event"].(datastore.ReplicationEvent)
+ require.Equal(t, datastore.JobStateInProgress, dequeuedEvent.State)
+ require.Equal(t, []string{"backup", "default"}, []string{dequeuedEvent.Job.TargetNodeStorage, dequeuedEvent.Job.SourceNodeStorage})
+
+ require.Equal(t,
+ []interface{}{"checksum comparison completed", "default", "correlation-id"},
+ []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]},
+ )
- require.NoError(t, replMgr.processReplicationEvent(ctx, events[0], shard, shard.Secondaries[0].GetConnection()))
+ require.Equal(t,
+ []interface{}{"replication job processing finished", "default", datastore.JobStateCompleted, "correlation-id"},
+ []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]},
+ )
relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath)
require.NoError(t, err)
@@ -197,13 +232,15 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
+ ctx = correlation.ContextWithCorrelation(ctx, "correlation-id")
+
rs := datastore.NewMemoryRepositoryStore(nil)
require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-1", 0))
require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-2", 0))
logger := testhelper.DiscardTestLogger(t)
hook := test.NewLocal(logger)
- r := &defaultReplicator{log: logrus.NewEntry(logger), rs: rs}
+ r := &defaultReplicator{rs: rs, log: logger}
require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
@@ -222,6 +259,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
CurrentGeneration: 0,
AttemptedGeneration: 0,
}, hook.LastEntry().Data["error"])
+ require.Equal(t, "correlation-id", hook.LastEntry().Data[logWithCorrID])
require.Equal(t, "target repository already on the same generation, skipping replication job", hook.LastEntry().Message)
require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-2", 1))
@@ -242,6 +280,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) {
CurrentGeneration: 1,
AttemptedGeneration: 0,
}, hook.LastEntry().Data["error"])
+ require.Equal(t, "correlation-id", hook.LastEntry().Data[logWithCorrID])
require.Equal(t, "repository downgrade prevented", hook.LastEntry().Message)
}
@@ -441,10 +480,8 @@ func TestConfirmReplication(t *testing.T) {
require.NoError(t, err)
var replicator defaultReplicator
- entry := testhelper.DiscardTestEntry(t)
- replicator.log = entry
- equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
+ equal, err := replicator.confirmChecksums(ctx, testhelper.DiscardTestLogger(t), gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
require.NoError(t, err)
require.True(t, equal)
@@ -452,7 +489,7 @@ func TestConfirmReplication(t *testing.T) {
Message: "a commit",
})
- equal, err = replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
+ equal, err = replicator.confirmChecksums(ctx, testhelper.DiscardTestLogger(t), gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
require.NoError(t, err)
require.False(t, equal)
}