diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-09-15 10:28:15 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-09-15 10:28:15 +0300 |
commit | 13cb309f2ac87a82647fcecac3d08d3808cd9ff3 (patch) | |
tree | 32ee7796c38f0335e8fa0fdacf40ed202db29bec /internal/praefect | |
parent | 4805333598f9ed608652cfd28650cabf110fc646 (diff) | |
parent | 79c221c0c537b2bced0911b71130d027dd209764 (diff) |
Merge branch 'ps-replication-logging' into 'master'
Fix logging of replication events processing.
See merge request gitlab-org/gitaly!2547
Diffstat (limited to 'internal/praefect')
-rw-r--r-- | internal/praefect/replicator.go | 113 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 71 |
2 files changed, 115 insertions, 69 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 440ee0ec9..ab56c5d07 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -16,7 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" prommetrics "gitlab.com/gitlab-org/gitaly/internal/prometheus/metrics" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" + "gitlab.com/gitlab-org/labkit/correlation" "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -38,8 +38,8 @@ type Replicator interface { } type defaultReplicator struct { - log *logrus.Entry rs datastore.RepositoryStore + log logrus.FieldLogger } func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error { @@ -53,6 +53,12 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli RelativePath: event.Job.RelativePath, } + logger := dr.log.WithFields(logrus.Fields{ + logWithVirtualStorage: event.Job.VirtualStorage, + logWithReplTarget: event.Job.TargetNodeStorage, + logWithCorrID: correlation.ExtractFromContext(ctx), + }) + generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.VirtualStorage, event.Job.RelativePath, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage) if err != nil { // Later generation might have already been replicated by an earlier replication job. If that's the case, @@ -64,7 +70,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli message = "target repository already on the same generation, skipping replication job" } - dr.log.WithError(downgradeErr).Info(message) + logger.WithError(downgradeErr).Info(message) return nil } @@ -77,7 +83,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli Source: sourceRepository, Repository: targetRepository, }); err != nil { - return fmt.Errorf("failed to create repository: %v", err) + return fmt.Errorf("failed to create repository: %w", err) } // check if the repository has an object pool @@ -104,7 +110,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli } } - checksumsMatch, err := dr.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(sourceCC), targetRepositoryClient, sourceRepository, targetRepository) + checksumsMatch, err := dr.confirmChecksums(ctx, logger, gitalypb.NewRepositoryServiceClient(sourceCC), targetRepositoryClient, sourceRepository, targetRepository) if err != nil { return err } @@ -115,10 +121,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.Repli targetRepository.GetStorageName(), sourceRepository.GetStorageName(), ).Inc() - dr.log.WithFields(logrus.Fields{ - "primary": sourceRepository, - "replica": targetRepository, - }).Error("checksums do not match") + + logger.Error("checksums do not match") } if generation != datastore.GenerationUnknown { @@ -154,7 +158,9 @@ func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.Replica return err } - dr.log.WithError(err).Info("replicated repository delete does not have a store entry") + dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)). + WithError(err). + Info("replicated repository delete does not have a store entry") } return nil @@ -194,7 +200,9 @@ func (dr defaultReplicator) Rename(ctx context.Context, event datastore.Replicat return err } - dr.log.WithError(err).Info("replicated repository rename does not have a store entry") + dr.log.WithField(logWithCorrID, correlation.ExtractFromContext(ctx)). + WithError(err). + Info("replicated repository rename does not have a store entry") } return nil @@ -278,7 +286,7 @@ func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClien } } -func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, replicaClient gitalypb.RepositoryServiceClient, primary, replica *gitalypb.Repository) (bool, error) { +func (dr defaultReplicator) confirmChecksums(ctx context.Context, logger logrus.FieldLogger, primaryClient, replicaClient gitalypb.RepositoryServiceClient, primary, replica *gitalypb.Repository) (bool, error) { g, gCtx := errgroup.WithContext(ctx) var primaryChecksum, replicaChecksum string @@ -290,12 +298,10 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, return false, err } - dr.log.WithFields(logrus.Fields{ - "primary": primary, - "replica": replica, + logger.WithFields(logrus.Fields{ "primary_checksum": primaryChecksum, "replica_checksum": replicaChecksum, - }).Info("replication finished") + }).Info("checksum comparison completed") return primaryChecksum == replicaChecksum, nil } @@ -347,7 +353,7 @@ func NewReplMgr(log *logrus.Entry, virtualStorages []string, queue datastore.Rep log: log.WithField("component", "replication_manager"), queue: queue, allowlist: map[string]struct{}{}, - replicator: defaultReplicator{log, rs}, + replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")}, virtualStorages: virtualStorages, nodeManager: nodeMgr, replInFlightMetric: prometheus.NewGaugeVec( @@ -393,9 +399,9 @@ func WithReplicator(r Replicator) ReplMgrOpt { } const ( - logWithReplJobID = "replication_job_id" - logWithReplTarget = "replication_job_target" - logWithCorrID = "correlation_id" + logWithReplTarget = "replication_job_target" + logWithCorrID = "correlation_id" + logWithVirtualStorage = "virtual_storage" ) type backoff func() time.Duration @@ -466,7 +472,7 @@ func (r ReplMgr) ProcessStale(ctx context.Context, checkPeriod, staleAfter time. } func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStorage string) { - logger := r.log.WithField("virtual_storage", virtualStorage) + logger := r.log.WithField(logWithVirtualStorage, virtualStorage) backoff, reset := b() logger.Info("processing started") @@ -489,7 +495,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora if !target.IsHealthy() { continue } - totalEvents += r.handleNode(ctx, logger, shard, virtualStorage, target) + totalEvents += r.handleNode(ctx, shard, virtualStorage, target) } } @@ -507,10 +513,12 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } } -func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int { +func (r ReplMgr) handleNode(ctx context.Context, shard nodes.Shard, virtualStorage string, target nodes.Node) int { + logger := r.log.WithFields(logrus.Fields{logWithVirtualStorage: virtualStorage, logWithReplTarget: target.GetStorage()}) + events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), int(r.dequeueBatchSize)) if err != nil { - logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events") + logger.WithError(err).Error("failed to dequeue replication events") return 0 } @@ -530,13 +538,17 @@ func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shar for state, eventIDs := range eventIDsByState { ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs) if err != nil { - logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") + logger.WithFields(logrus.Fields{"state": state, "event_ids": eventIDs}). + WithError(err). + Error("failed to acknowledge replication events") continue } notAckIDs := subtractUint64(ackIDs, eventIDs) if len(notAckIDs) > 0 { - logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged") + logger.WithFields(logrus.Fields{"state": state, "event_ids": notAckIDs}). + WithError(err). + Error("replication events were not acknowledged") } } @@ -563,26 +575,27 @@ func (r ReplMgr) startHealthUpdate(ctx context.Context, logger logrus.FieldLogge } func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, target nodes.Node, event datastore.ReplicationEvent) datastore.JobState { - ctxLogger := logger.WithFields(logrus.Fields{ - logWithReplJobID: event.ID, - logWithCorrID: getCorrelationID(event.Meta), - }) - ctxLogger.Info("processing replication job") + cid := getCorrelationID(event.Meta) + ctx = correlation.ContextWithCorrelation(ctx, cid) - if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil { - ctxLogger.WithError(err).Error("replication job failed") + // we want it to be queryable by common `json.correlation_id` filter + logger = logger.WithField(logWithCorrID, cid) + // we log all details about the event only once before start of the processing + logger.WithField("event", event).Info("replication job processing started") + if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil { + newState := datastore.JobStateFailed if event.Attempt <= 0 { - logger.WithField("event", event).Info("handled event would be deleted") - return datastore.JobStateDead + newState = datastore.JobStateDead } - return datastore.JobStateFailed + logger.WithError(err).WithField("new_state", newState).Error("replication job processing finished") + return newState } - logger.WithField("event", event).Info("handled event would be deleted") - - return datastore.JobStateCompleted + newState := datastore.JobStateCompleted + logger.WithField("new_state", newState).Info("replication job processing finished") + return newState } func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error { @@ -591,23 +604,19 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re return fmt.Errorf("get source node: %w", err) } - cid := getCorrelationID(event.Meta) - - var replCtx context.Context var cancel func() if r.replJobTimeout > 0 { - replCtx, cancel = context.WithTimeout(ctx, r.replJobTimeout) + ctx, cancel = context.WithTimeout(ctx, r.replJobTimeout) } else { - replCtx, cancel = context.WithCancel(ctx) + ctx, cancel = context.WithCancel(ctx) } defer cancel() - injectedCtx, err := helper.InjectGitalyServers(replCtx, event.Job.SourceNodeStorage, source.GetAddress(), source.GetToken()) + ctx, err = helper.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.GetAddress(), source.GetToken()) if err != nil { return fmt.Errorf("inject Gitaly servers into context: %w", err) } - injectedCtx = grpccorrelation.InjectToOutgoingContext(injectedCtx, cid) replStart := time.Now() @@ -619,17 +628,17 @@ func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.Re switch event.Job.Change { case datastore.UpdateRepo: - err = r.replicator.Replicate(injectedCtx, event, source.GetConnection(), targetCC) + err = r.replicator.Replicate(ctx, event, source.GetConnection(), targetCC) case datastore.DeleteRepo: - err = r.replicator.Destroy(injectedCtx, event, targetCC) + err = r.replicator.Destroy(ctx, event, targetCC) case datastore.RenameRepo: - err = r.replicator.Rename(injectedCtx, event, targetCC) + err = r.replicator.Rename(ctx, event, targetCC) case datastore.GarbageCollect: - err = r.replicator.GarbageCollect(injectedCtx, event, targetCC) + err = r.replicator.GarbageCollect(ctx, event, targetCC) case datastore.RepackFull: - err = r.replicator.RepackFull(injectedCtx, event, targetCC) + err = r.replicator.RepackFull(ctx, event, targetCC) case datastore.RepackIncremental: - err = r.replicator.RepackIncremental(injectedCtx, event, targetCC) + err = r.replicator.RepackIncremental(ctx, event, targetCC) default: err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change) } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index cb98654f1..3a943d5b8 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -39,6 +39,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -46,7 +47,7 @@ import ( "google.golang.org/grpc/reflection" ) -func TestProcessReplicationJob(t *testing.T) { +func TestReplMgr_ProcessBacklog(t *testing.T) { backupStorageName := "backup" backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName) @@ -92,8 +93,6 @@ func TestProcessReplicationJob(t *testing.T) { }, } - queue := datastore.NewMemoryReplicationEventQueue(conf) - // create object pool on the source objectPoolPath := testhelper.NewTestObjectPoolName(t) pool, err := objectpool.NewObjectPool(gitaly_config.NewLocator(gitaly_config.Config), testRepo.GetStorageName(), objectPoolPath) @@ -154,27 +153,63 @@ func TestProcessReplicationJob(t *testing.T) { Message: "a commit", }) - replicator := defaultReplicator{ - log: entry, - rs: datastore.NewMemoryRepositoryStore(conf.StorageNames()), - } - var mockReplicationLatencyHistogramVec promtest.MockHistogramVec var mockReplicationDelayHistogramVec promtest.MockHistogramVec + logger := testhelper.DiscardTestLogger(t) + hook := test.NewLocal(logger) + + queue := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) + queue.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { + cancel() // when it is called we know that replication is finished + return queue.Acknowledge(ctx, state, ids) + }) + + loggerEntry := logger.WithField("test", t.Name()) + _, err = queue.Enqueue(ctx, events[0]) + require.NoError(t, err) + + store := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + replMgr := NewReplMgr( - testhelper.DiscardTestEntry(t), + loggerEntry, conf.VirtualStorageNames(), queue, - datastore.NewMemoryRepositoryStore(conf.StorageNames()), + store, nodeMgr, WithLatencyMetric(&mockReplicationLatencyHistogramVec), WithDelayMetric(&mockReplicationDelayHistogramVec), ) - replMgr.replicator = replicator + replMgr.ProcessBacklog(ctx, ExpBackoffFunc(time.Hour, 0)) + + <-ctx.Done() + + logEntries := hook.AllEntries() + require.True(t, len(logEntries) > 3, "expected at least 4 log entries to be present") + require.Equal(t, + []interface{}{"processing started", "default"}, + []interface{}{logEntries[0].Message, logEntries[0].Data["virtual_storage"]}, + ) + + require.Equal(t, + []interface{}{"replication job processing started", "default", "correlation-id"}, + []interface{}{logEntries[1].Message, logEntries[1].Data["virtual_storage"], logEntries[1].Data[logWithCorrID]}, + ) + + dequeuedEvent := logEntries[1].Data["event"].(datastore.ReplicationEvent) + require.Equal(t, datastore.JobStateInProgress, dequeuedEvent.State) + require.Equal(t, []string{"backup", "default"}, []string{dequeuedEvent.Job.TargetNodeStorage, dequeuedEvent.Job.SourceNodeStorage}) + + require.Equal(t, + []interface{}{"checksum comparison completed", "default", "correlation-id"}, + []interface{}{logEntries[2].Message, logEntries[2].Data["virtual_storage"], logEntries[2].Data[logWithCorrID]}, + ) - require.NoError(t, replMgr.processReplicationEvent(ctx, events[0], shard, shard.Secondaries[0].GetConnection())) + require.Equal(t, + []interface{}{"replication job processing finished", "default", datastore.JobStateCompleted, "correlation-id"}, + []interface{}{logEntries[3].Message, logEntries[3].Data["virtual_storage"], logEntries[3].Data["new_state"], logEntries[3].Data[logWithCorrID]}, + ) relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath) require.NoError(t, err) @@ -197,13 +232,15 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() + ctx = correlation.ContextWithCorrelation(ctx, "correlation-id") + rs := datastore.NewMemoryRepositoryStore(nil) require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-1", 0)) require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-2", 0)) logger := testhelper.DiscardTestLogger(t) hook := test.NewLocal(logger) - r := &defaultReplicator{log: logrus.NewEntry(logger), rs: rs} + r := &defaultReplicator{rs: rs, log: logger} require.NoError(t, r.Replicate(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ @@ -222,6 +259,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { CurrentGeneration: 0, AttemptedGeneration: 0, }, hook.LastEntry().Data["error"]) + require.Equal(t, "correlation-id", hook.LastEntry().Data[logWithCorrID]) require.Equal(t, "target repository already on the same generation, skipping replication job", hook.LastEntry().Message) require.NoError(t, rs.SetGeneration(ctx, "virtual-storage-1", "relative-path-1", "gitaly-2", 1)) @@ -242,6 +280,7 @@ func TestReplicatorDowngradeAttempt(t *testing.T) { CurrentGeneration: 1, AttemptedGeneration: 0, }, hook.LastEntry().Data["error"]) + require.Equal(t, "correlation-id", hook.LastEntry().Data[logWithCorrID]) require.Equal(t, "repository downgrade prevented", hook.LastEntry().Message) } @@ -441,10 +480,8 @@ func TestConfirmReplication(t *testing.T) { require.NoError(t, err) var replicator defaultReplicator - entry := testhelper.DiscardTestEntry(t) - replicator.log = entry - equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) + equal, err := replicator.confirmChecksums(ctx, testhelper.DiscardTestLogger(t), gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) require.NoError(t, err) require.True(t, equal) @@ -452,7 +489,7 @@ func TestConfirmReplication(t *testing.T) { Message: "a commit", }) - equal, err = replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) + equal, err = replicator.confirmChecksums(ctx, testhelper.DiscardTestLogger(t), gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) require.NoError(t, err) require.False(t, equal) } |