diff options
author | John Cai <jcai@gitlab.com> | 2019-09-13 18:09:28 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-09-13 18:09:28 +0300 |
commit | 5cc3f781ce064521469ec227910b2b74ee853ee2 (patch) | |
tree | 9c436985faf08ea0778855f65581cb5519f50178 | |
parent | 2f889883152c873edb430f090d44fba2ef3f5a4c (diff) | |
parent | c5fec42ddf3972061674e82bd83b667f3c602687 (diff) |
Merge branch 'jc-add-replication-latency' into 'master'
Measure replication latency
Closes #1882
See merge request gitlab-org/gitaly!1481
-rw-r--r-- | changelogs/unreleased/jc-add-replication-latency.yml | 5 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 101 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 75 | ||||
-rw-r--r-- | internal/testhelper/metrics.go | 36 |
4 files changed, 191 insertions, 26 deletions
diff --git a/changelogs/unreleased/jc-add-replication-latency.yml b/changelogs/unreleased/jc-add-replication-latency.yml new file mode 100644 index 000000000..37cce1bcb --- /dev/null +++ b/changelogs/unreleased/jc-add-replication-latency.yml @@ -0,0 +1,5 @@ +--- +title: Measure replication latency +merge_request: 1481 +author: +type: other diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 32a884426..c8de2716b 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -14,6 +15,42 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) +var ( + replicationLatency = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "replication_latency", + Buckets: prometheus.LinearBuckets(0, 100, 100), + }, + ) + + replicationJobsInFlight = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "replication_jobs", + }, + ) + + recordReplicationLatency = func(d float64) { + go replicationLatency.Observe(d) + } + + incReplicationJobsInFlight = func() { + go replicationJobsInFlight.Inc() + } + + decReplicationJobsInFlight = func() { + go replicationJobsInFlight.Dec() + } +) + +func init() { + prometheus.MustRegister(replicationLatency) + prometheus.MustRegister(replicationJobsInFlight) +} + // Replicator performs the actual replication logic between two nodes type Replicator interface { Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error @@ -225,35 +262,53 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { "to_storage": job.TargetNode.Storage, "relative_path": job.Repository.RelativePath, }).Info("processing replication job") - - if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + if err := r.processReplJob(ctx, job); err != nil { return err } + } + } + } +} - ctx, err = helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, "") - if err != nil { - return err - } +func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error { + if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + return err + } - targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage) - if err != nil { - return err - } + targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage) + if err != nil { + return err + } - sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage) - if err != nil { - return err - } + sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage) + if err != nil { + return err + } - if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil { - r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") - return err - } + if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil { + r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") + return err + } + injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, "") + if err != nil { + return err + } - if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil { - return err - } - } - } + replStart := time.Now() + incReplicationJobsInFlight() + defer decReplicationJobsInFlight() + + if err := r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC); err != nil { + r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") + return err } + + replDuration := time.Since(replStart) + recordReplicationLatency(float64(replDuration / time.Millisecond)) + + if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil { + return err + } + return nil + } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index d7a9aa3db..60dc69cda 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -1,12 +1,15 @@ package praefect import ( + "context" "io/ioutil" "log" "net" "os" "path/filepath" + "sync" "testing" + "time" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -14,7 +17,7 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" - gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" + gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" @@ -70,7 +73,7 @@ func TestReplicate(t *testing.T) { require.NoError(t, err) var replicator defaultReplicator - replicator.log = gitalylog.Default() + replicator.log = gitaly_log.Default() require.NoError(t, replicator.Replicate( ctx, @@ -114,7 +117,7 @@ func TestConfirmReplication(t *testing.T) { require.NoError(t, err) var replicator defaultReplicator - replicator.log = gitalylog.Default() + replicator.log = gitaly_log.Default() equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) require.NoError(t, err) @@ -129,6 +132,72 @@ func TestConfirmReplication(t *testing.T) { require.False(t, equal) } +type noopReplicator struct { + replicationLatency time.Duration +} + +func (n *noopReplicator) Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error { + time.Sleep(n.replicationLatency) + return nil +} + +func TestReplicationMetrics(t *testing.T) { + mockReplicationLatency := &testhelper.MockHistogram{} + mockReplicationGauge := &testhelper.MockGauge{} + + recordReplicationLatency = func(d float64) { + mockReplicationLatency.Observe(d) + } + + incReplicationJobsInFlight = func() { + mockReplicationGauge.Inc() + } + + decReplicationJobsInFlight = func() { + mockReplicationGauge.Dec() + } + + job := jobRecord{state: JobStateReady} + + m := &MemoryDatastore{ + jobs: &struct { + sync.RWMutex + records map[uint64]jobRecord // all jobs indexed by ID + }{ + records: map[uint64]jobRecord{1: job}, + }, + } + + replJob := ReplJob{ID: 1, + TargetNode: models.Node{Storage: "target"}, + SourceNode: models.Node{Storage: "source"}, + Repository: models.Repository{Primary: models.Node{Storage: "target"}}, + State: JobStateReady, + } + + coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)} + coordinator.RegisterNode("source", "tcp://127.0.0.1") + coordinator.RegisterNode("target", "tcp://127.0.0.1") + + replicationLatencyMs := 10 + + replMgr := &ReplMgr{ + log: gitaly_log.Default(), + datastore: m, + coordinator: coordinator, + replicator: &noopReplicator{replicationLatency: time.Duration(replicationLatencyMs) * time.Millisecond}, + } + + ctx, cancel := testhelper.Context() + defer cancel() + + require.NoError(t, replMgr.processReplJob(ctx, replJob)) + require.Equal(t, 1, mockReplicationGauge.IncrementCalled) + require.Equal(t, 1, mockReplicationGauge.DecrementCalled) + require.Len(t, mockReplicationLatency.Values, 1) + require.Equal(t, replicationLatencyMs, int(mockReplicationLatency.Values[0])) +} + func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { server := serverPkg.NewInsecure(RubyServer) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/testhelper/metrics.go b/internal/testhelper/metrics.go new file mode 100644 index 000000000..a7359332e --- /dev/null +++ b/internal/testhelper/metrics.go @@ -0,0 +1,36 @@ +package testhelper + +import "sync" + +// MockGauge is a simplified prometheus gauge with Inc and Dec that can be inspected +type MockGauge struct { + sync.Mutex + IncrementCalled, DecrementCalled int +} + +// Inc increments the IncrementCalled counter +func (mg *MockGauge) Inc() { + mg.Lock() + defer mg.Unlock() + mg.IncrementCalled++ +} + +// Dec increments the DecrementCalled counter +func (mg *MockGauge) Dec() { + mg.Lock() + defer mg.Unlock() + mg.DecrementCalled++ +} + +// MockHistogram is a simplified prometheus histogram with Observe +type MockHistogram struct { + sync.Mutex + Values []float64 +} + +// Observe adds a value to the Values slice +func (mh *MockHistogram) Observe(d float64) { + mh.Lock() + defer mh.Unlock() + mh.Values = append(mh.Values, d) +} |