Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-09-13 18:09:28 +0300
committerJohn Cai <jcai@gitlab.com>2019-09-13 18:09:28 +0300
commit5cc3f781ce064521469ec227910b2b74ee853ee2 (patch)
tree9c436985faf08ea0778855f65581cb5519f50178
parent2f889883152c873edb430f090d44fba2ef3f5a4c (diff)
parentc5fec42ddf3972061674e82bd83b667f3c602687 (diff)
Merge branch 'jc-add-replication-latency' into 'master'
Measure replication latency Closes #1882 See merge request gitlab-org/gitaly!1481
-rw-r--r--changelogs/unreleased/jc-add-replication-latency.yml5
-rw-r--r--internal/praefect/replicator.go101
-rw-r--r--internal/praefect/replicator_test.go75
-rw-r--r--internal/testhelper/metrics.go36
4 files changed, 191 insertions, 26 deletions
diff --git a/changelogs/unreleased/jc-add-replication-latency.yml b/changelogs/unreleased/jc-add-replication-latency.yml
new file mode 100644
index 000000000..37cce1bcb
--- /dev/null
+++ b/changelogs/unreleased/jc-add-replication-latency.yml
@@ -0,0 +1,5 @@
+---
+title: Measure replication latency
+merge_request: 1481
+author:
+type: other
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 32a884426..c8de2716b 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -5,6 +5,7 @@ import (
"fmt"
"time"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@@ -14,6 +15,42 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
+var (
+ replicationLatency = prometheus.NewHistogram(
+ prometheus.HistogramOpts{
+ Namespace: "gitaly",
+ Subsystem: "praefect",
+ Name: "replication_latency",
+ Buckets: prometheus.LinearBuckets(0, 100, 100),
+ },
+ )
+
+ replicationJobsInFlight = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "gitaly",
+ Subsystem: "praefect",
+ Name: "replication_jobs",
+ },
+ )
+
+ recordReplicationLatency = func(d float64) {
+ go replicationLatency.Observe(d)
+ }
+
+ incReplicationJobsInFlight = func() {
+ go replicationJobsInFlight.Inc()
+ }
+
+ decReplicationJobsInFlight = func() {
+ go replicationJobsInFlight.Dec()
+ }
+)
+
+func init() {
+ prometheus.MustRegister(replicationLatency)
+ prometheus.MustRegister(replicationJobsInFlight)
+}
+
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error
@@ -225,35 +262,53 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
"to_storage": job.TargetNode.Storage,
"relative_path": job.Repository.RelativePath,
}).Info("processing replication job")
-
- if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ if err := r.processReplJob(ctx, job); err != nil {
return err
}
+ }
+ }
+ }
+}
- ctx, err = helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, "")
- if err != nil {
- return err
- }
+func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) error {
+ if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ return err
+ }
- targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage)
- if err != nil {
- return err
- }
+ targetCC, err := r.coordinator.GetConnection(job.TargetNode.Storage)
+ if err != nil {
+ return err
+ }
- sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage)
- if err != nil {
- return err
- }
+ sourceCC, err := r.coordinator.GetConnection(job.Repository.Primary.Storage)
+ if err != nil {
+ return err
+ }
- if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil {
- r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
- return err
- }
+ if err := r.replicator.Replicate(ctx, job, sourceCC, targetCC); err != nil {
+ r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
+ return err
+ }
+ injectedCtx, err := helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, "")
+ if err != nil {
+ return err
+ }
- if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
- return err
- }
- }
- }
+ replStart := time.Now()
+ incReplicationJobsInFlight()
+ defer decReplicationJobsInFlight()
+
+ if err := r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC); err != nil {
+ r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
+ return err
}
+
+ replDuration := time.Since(replStart)
+ recordReplicationLatency(float64(replDuration / time.Millisecond))
+
+ if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ return err
+ }
+ return nil
+
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index d7a9aa3db..60dc69cda 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -1,12 +1,15 @@
package praefect
import (
+ "context"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
+ "sync"
"testing"
+ "time"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
@@ -14,7 +17,7 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- gitalylog "gitlab.com/gitlab-org/gitaly/internal/log"
+ gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
@@ -70,7 +73,7 @@ func TestReplicate(t *testing.T) {
require.NoError(t, err)
var replicator defaultReplicator
- replicator.log = gitalylog.Default()
+ replicator.log = gitaly_log.Default()
require.NoError(t, replicator.Replicate(
ctx,
@@ -114,7 +117,7 @@ func TestConfirmReplication(t *testing.T) {
require.NoError(t, err)
var replicator defaultReplicator
- replicator.log = gitalylog.Default()
+ replicator.log = gitaly_log.Default()
equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
require.NoError(t, err)
@@ -129,6 +132,72 @@ func TestConfirmReplication(t *testing.T) {
require.False(t, equal)
}
+type noopReplicator struct {
+ replicationLatency time.Duration
+}
+
+func (n *noopReplicator) Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error {
+ time.Sleep(n.replicationLatency)
+ return nil
+}
+
+func TestReplicationMetrics(t *testing.T) {
+ mockReplicationLatency := &testhelper.MockHistogram{}
+ mockReplicationGauge := &testhelper.MockGauge{}
+
+ recordReplicationLatency = func(d float64) {
+ mockReplicationLatency.Observe(d)
+ }
+
+ incReplicationJobsInFlight = func() {
+ mockReplicationGauge.Inc()
+ }
+
+ decReplicationJobsInFlight = func() {
+ mockReplicationGauge.Dec()
+ }
+
+ job := jobRecord{state: JobStateReady}
+
+ m := &MemoryDatastore{
+ jobs: &struct {
+ sync.RWMutex
+ records map[uint64]jobRecord // all jobs indexed by ID
+ }{
+ records: map[uint64]jobRecord{1: job},
+ },
+ }
+
+ replJob := ReplJob{ID: 1,
+ TargetNode: models.Node{Storage: "target"},
+ SourceNode: models.Node{Storage: "source"},
+ Repository: models.Repository{Primary: models.Node{Storage: "target"}},
+ State: JobStateReady,
+ }
+
+ coordinator := &Coordinator{nodes: make(map[string]*grpc.ClientConn)}
+ coordinator.RegisterNode("source", "tcp://127.0.0.1")
+ coordinator.RegisterNode("target", "tcp://127.0.0.1")
+
+ replicationLatencyMs := 10
+
+ replMgr := &ReplMgr{
+ log: gitaly_log.Default(),
+ datastore: m,
+ coordinator: coordinator,
+ replicator: &noopReplicator{replicationLatency: time.Duration(replicationLatencyMs) * time.Millisecond},
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ require.NoError(t, replMgr.processReplJob(ctx, replJob))
+ require.Equal(t, 1, mockReplicationGauge.IncrementCalled)
+ require.Equal(t, 1, mockReplicationGauge.DecrementCalled)
+ require.Len(t, mockReplicationLatency.Values, 1)
+ require.Equal(t, replicationLatencyMs, int(mockReplicationLatency.Values[0]))
+}
+
func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
server := serverPkg.NewInsecure(RubyServer)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/testhelper/metrics.go b/internal/testhelper/metrics.go
new file mode 100644
index 000000000..a7359332e
--- /dev/null
+++ b/internal/testhelper/metrics.go
@@ -0,0 +1,36 @@
+package testhelper
+
+import "sync"
+
+// MockGauge is a simplified prometheus gauge with Inc and Dec that can be inspected
+type MockGauge struct {
+ sync.Mutex
+ IncrementCalled, DecrementCalled int
+}
+
+// Inc increments the IncrementCalled counter
+func (mg *MockGauge) Inc() {
+ mg.Lock()
+ defer mg.Unlock()
+ mg.IncrementCalled++
+}
+
+// Dec increments the DecrementCalled counter
+func (mg *MockGauge) Dec() {
+ mg.Lock()
+ defer mg.Unlock()
+ mg.DecrementCalled++
+}
+
+// MockHistogram is a simplified prometheus histogram with Observe
+type MockHistogram struct {
+ sync.Mutex
+ Values []float64
+}
+
+// Observe adds a value to the Values slice
+func (mh *MockHistogram) Observe(d float64) {
+ mh.Lock()
+ defer mh.Unlock()
+ mh.Values = append(mh.Values, d)
+}