Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-02-26 16:08:32 +0300
committerJacob Vosmaer <jacob@gitlab.com>2020-02-26 16:08:32 +0300
commit8886b2b80355326af6806053a2b7e2c0dcc3b0de (patch)
tree6e51a5a067ee76b856a03ca19eeabb8bf1afebf0
parent51e3e57cfcf48563cf3cddf31ca81968a75dad3a (diff)
Add correlation ID to replication jobs
This change adds a correlation ID to persisted replication jobs. Additionally, it ensures that all RPC's related to a replication job provide the correlation ID in the context metadata.
-rw-r--r--changelogs/unreleased/po-praefect-correlation-replication.yml5
-rw-r--r--internal/praefect/coordinator.go8
-rw-r--r--internal/praefect/coordinator_test.go16
-rw-r--r--internal/praefect/datastore/datastore.go25
-rw-r--r--internal/praefect/datastore/datastore_test.go18
-rw-r--r--internal/praefect/helper_test.go3
-rw-r--r--internal/praefect/nodes/manager.go3
-rw-r--r--internal/praefect/replicator.go6
-rw-r--r--internal/praefect/replicator_test.go8
-rw-r--r--internal/praefect/server_test.go10
10 files changed, 69 insertions, 33 deletions
diff --git a/changelogs/unreleased/po-praefect-correlation-replication.yml b/changelogs/unreleased/po-praefect-correlation-replication.yml
new file mode 100644
index 000000000..5fe31cf50
--- /dev/null
+++ b/changelogs/unreleased/po-praefect-correlation-replication.yml
@@ -0,0 +1,5 @@
+---
+title: Add correlation ID to Praefect replication jobs
+merge_request: 1833
+author:
+type: other
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 45b0dd841..8df81bdc5 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -16,6 +16,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -97,7 +98,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot
return nil, err
}
- if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
+ if requestFinalizer, err = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change); err != nil {
return nil, err
}
}
@@ -188,12 +189,13 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) {
+func (c *Coordinator) createReplicaJobs(ctx context.Context, targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) {
var secondaryStorages []string
for _, secondary := range secondaries {
secondaryStorages = append(secondaryStorages, secondary.GetStorage())
}
- jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change)
+
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(correlation.ExtractFromContext(ctx), targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index dccab79f0..2018e4112 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -14,6 +14,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
)
var testLogger = logrus.New()
@@ -74,7 +75,7 @@ func TestStreamDirector(t *testing.T) {
fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool"
peeker := &mockPeeker{frame}
- streamParams, err := coordinator.streamDirector(ctx, fullMethod, peeker)
+ streamParams, err := coordinator.streamDirector(correlation.ContextWithCorrelation(ctx, "1"), fullMethod, peeker)
require.NoError(t, err)
require.Equal(t, address, streamParams.Conn().Target())
@@ -103,12 +104,13 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
expectedJob := datastore.ReplJob{
- Change: datastore.UpdateRepo,
- ID: 1,
- TargetNode: targetNode,
- SourceNode: sourceNode,
- State: datastore.JobStatePending,
- RelativePath: targetRepo.RelativePath,
+ Change: datastore.UpdateRepo,
+ ID: 1,
+ TargetNode: targetNode,
+ SourceNode: sourceNode,
+ State: datastore.JobStatePending,
+ RelativePath: targetRepo.RelativePath,
+ CorrelationID: "1",
}
require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 5896e7797..92cda155e 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -63,6 +63,7 @@ type ReplJob struct {
RelativePath string // source for replication
State JobState
Attempts int
+ CorrelationID string // from original request
}
// replJobs provides sort manipulation behavior
@@ -108,7 +109,7 @@ type ReplJobsDatastore interface {
// CreateReplicaReplJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error)
+ CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error)
// UpdateReplJobState updates the state of an existing replication job
UpdateReplJobState(jobID uint64, newState JobState) error
@@ -122,6 +123,7 @@ type jobRecord struct {
targetNodeStorage, sourceNodeStorage string
state JobState
attempts int
+ correlationID string // from original request
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
@@ -286,13 +288,14 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
}
return ReplJob{
- Change: record.change,
- ID: jobID,
- RelativePath: record.relativePath,
- SourceNode: sourceNode,
- State: record.state,
- TargetNode: targetNode,
- Attempts: record.attempts,
+ Change: record.change,
+ ID: jobID,
+ RelativePath: record.relativePath,
+ SourceNode: sourceNode,
+ State: record.state,
+ TargetNode: targetNode,
+ Attempts: record.attempts,
+ CorrelationID: record.correlationID,
}, nil
}
@@ -302,13 +305,16 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
if relativePath == "" {
return nil, errors.New("invalid source repository")
}
+ if correlationID == "" {
+ return nil, errors.New("invalid correlation ID")
+ }
var jobIDs []uint64
@@ -321,6 +327,7 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primarySto
state: JobStatePending,
relativePath: relativePath,
sourceNodeStorage: primaryStorage,
+ correlationID: correlationID,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 0ee189123..1094f4186 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -20,7 +20,8 @@ var (
Address: "tcp://address-2",
Storage: "praefect-storage-2",
}
- proj1 = "abcd1234" // imagine this is a legit project hash
+ proj1 = "abcd1234" // imagine this is a legit project hash
+ correlationID = "1"
)
var (
@@ -44,7 +45,7 @@ var operations = []struct {
{
desc: "insert replication job",
opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo)
+ _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo)
require.NoError(t, err)
},
},
@@ -56,12 +57,13 @@ var operations = []struct {
require.Len(t, jobs, 1)
expectedJob := ReplJob{
- Change: UpdateRepo,
- ID: 1,
- RelativePath: repo1Repository.RelativePath,
- SourceNode: stor1,
- TargetNode: stor2,
- State: JobStatePending,
+ Change: UpdateRepo,
+ ID: 1,
+ RelativePath: repo1Repository.RelativePath,
+ SourceNode: stor1,
+ TargetNode: stor2,
+ State: JobStatePending,
+ CorrelationID: correlationID,
}
require.Equal(t, expectedJob, jobs[0])
},
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 16094ac6c..97b47cfb9 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -27,6 +27,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
@@ -288,6 +289,8 @@ func listenAvailPort(tb testing.TB) (net.Listener, int) {
func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
opts := []grpc.DialOption{
grpc.WithBlock(),
+ grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()),
+ grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()),
}
if backend {
opts = append(
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index d45affbfe..3cffa8abc 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -86,6 +87,8 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption)
conn, err := client.Dial(node.Address,
append(
[]grpc.DialOption{
+ grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()),
+ grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()),
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 49af0be3d..fb8018138 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
@@ -41,6 +42,11 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
+ if job.CorrelationID == "" {
+ return fmt.Errorf("replication job %d missing correlation ID", job.ID)
+ }
+ ctx = correlation.ContextWithCorrelation(ctx, job.CorrelationID)
+
if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
Source: sourceRepository,
Repository: targetRepository,
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 24b10b051..aee29234f 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -25,6 +25,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
@@ -103,6 +104,7 @@ func TestProcessReplicationJob(t *testing.T) {
defer cancel()
injectedCtx := metadata.NewOutgoingContext(ctx, testhelper.GitalyServersMetadata(t, srvSocketPath))
+ injectedCtx = correlation.ContextWithCorrelation(injectedCtx, "1")
repoClient, con := newRepositoryClient(t, srvSocketPath)
defer con.Close()
@@ -122,7 +124,7 @@ func TestProcessReplicationJob(t *testing.T) {
for _, secondary := range secondaries {
secondaryStorages = append(secondaryStorages, secondary.Storage)
}
- _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo)
+ _, err = ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo)
require.NoError(t, err)
jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)
@@ -268,7 +270,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
)
ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
+ ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
require.NoError(t, err)
require.Len(t, ids, 1)
@@ -367,7 +369,7 @@ func TestProcessBacklog_Success(t *testing.T) {
)
ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
+ ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
require.NoError(t, err)
require.Len(t, ids, 1)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 23fb06f9c..0f58d6c05 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -24,6 +24,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "gitlab.com/gitlab-org/labkit/correlation"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -415,9 +416,12 @@ func TestRepoRemoval(t *testing.T) {
rClient := gitalypb.NewRepositoryServiceClient(cc)
- _, err := rClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
- Repository: &virtualRepo,
- })
+ _, err := rClient.RemoveRepository(
+ correlation.ContextWithCorrelation(ctx, "1"),
+ &gitalypb.RemoveRepositoryRequest{
+ Repository: &virtualRepo,
+ },
+ )
require.NoError(t, err)
resp, err := rClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{