diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-02-26 16:08:32 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2020-02-26 16:08:32 +0300 |
commit | 8886b2b80355326af6806053a2b7e2c0dcc3b0de (patch) | |
tree | 6e51a5a067ee76b856a03ca19eeabb8bf1afebf0 | |
parent | 51e3e57cfcf48563cf3cddf31ca81968a75dad3a (diff) |
Add correlation ID to replication jobs
This change adds a correlation ID to persisted replication jobs.
Additionally, it ensures that all RPC's related to a replication job
provide the correlation ID in the context metadata.
-rw-r--r-- | changelogs/unreleased/po-praefect-correlation-replication.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 8 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 25 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 3 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 6 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 10 |
10 files changed, 69 insertions, 33 deletions
diff --git a/changelogs/unreleased/po-praefect-correlation-replication.yml b/changelogs/unreleased/po-praefect-correlation-replication.yml new file mode 100644 index 000000000..5fe31cf50 --- /dev/null +++ b/changelogs/unreleased/po-praefect-correlation-replication.yml @@ -0,0 +1,5 @@ +--- +title: Add correlation ID to Praefect replication jobs +merge_request: 1833 +author: +type: other diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 45b0dd841..8df81bdc5 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -16,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -97,7 +98,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot return nil, err } - if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil { + if requestFinalizer, err = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change); err != nil { return nil, err } } @@ -188,12 +189,13 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi return m, nil } -func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) { +func (c *Coordinator) createReplicaJobs(ctx context.Context, targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) { var secondaryStorages []string for _, secondary := range secondaries { secondaryStorages = append(secondaryStorages, secondary.GetStorage()) } - jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change) + + jobIDs, err := c.datastore.CreateReplicaReplJobs(correlation.ExtractFromContext(ctx), targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change) if err != nil { return nil, err } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index dccab79f0..2018e4112 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -14,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" ) var testLogger = logrus.New() @@ -74,7 +75,7 @@ func TestStreamDirector(t *testing.T) { fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool" peeker := &mockPeeker{frame} - streamParams, err := coordinator.streamDirector(ctx, fullMethod, peeker) + streamParams, err := coordinator.streamDirector(correlation.ContextWithCorrelation(ctx, "1"), fullMethod, peeker) require.NoError(t, err) require.Equal(t, address, streamParams.Conn().Target()) @@ -103,12 +104,13 @@ func TestStreamDirector(t *testing.T) { require.NoError(t, err) expectedJob := datastore.ReplJob{ - Change: datastore.UpdateRepo, - ID: 1, - TargetNode: targetNode, - SourceNode: sourceNode, - State: datastore.JobStatePending, - RelativePath: targetRepo.RelativePath, + Change: datastore.UpdateRepo, + ID: 1, + TargetNode: targetNode, + SourceNode: sourceNode, + State: datastore.JobStatePending, + RelativePath: targetRepo.RelativePath, + CorrelationID: "1", } require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct") diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 5896e7797..92cda155e 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -63,6 +63,7 @@ type ReplJob struct { RelativePath string // source for replication State JobState Attempts int + CorrelationID string // from original request } // replJobs provides sort manipulation behavior @@ -108,7 +109,7 @@ type ReplJobsDatastore interface { // CreateReplicaReplJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) + CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) // UpdateReplJobState updates the state of an existing replication job UpdateReplJobState(jobID uint64, newState JobState) error @@ -122,6 +123,7 @@ type jobRecord struct { targetNodeStorage, sourceNodeStorage string state JobState attempts int + correlationID string // from original request } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is @@ -286,13 +288,14 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re } return ReplJob{ - Change: record.change, - ID: jobID, - RelativePath: record.relativePath, - SourceNode: sourceNode, - State: record.state, - TargetNode: targetNode, - Attempts: record.attempts, + Change: record.change, + ID: jobID, + RelativePath: record.relativePath, + SourceNode: sourceNode, + State: record.state, + TargetNode: targetNode, + Attempts: record.attempts, + CorrelationID: record.correlationID, }, nil } @@ -302,13 +305,16 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi // CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() if relativePath == "" { return nil, errors.New("invalid source repository") } + if correlationID == "" { + return nil, errors.New("invalid correlation ID") + } var jobIDs []uint64 @@ -321,6 +327,7 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primarySto state: JobStatePending, relativePath: relativePath, sourceNodeStorage: primaryStorage, + correlationID: correlationID, } jobIDs = append(jobIDs, nextID) diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go index 0ee189123..1094f4186 100644 --- a/internal/praefect/datastore/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -20,7 +20,8 @@ var ( Address: "tcp://address-2", Storage: "praefect-storage-2", } - proj1 = "abcd1234" // imagine this is a legit project hash + proj1 = "abcd1234" // imagine this is a legit project hash + correlationID = "1" ) var ( @@ -44,7 +45,7 @@ var operations = []struct { { desc: "insert replication job", opFn: func(t *testing.T, ds Datastore) { - _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo) + _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo) require.NoError(t, err) }, }, @@ -56,12 +57,13 @@ var operations = []struct { require.Len(t, jobs, 1) expectedJob := ReplJob{ - Change: UpdateRepo, - ID: 1, - RelativePath: repo1Repository.RelativePath, - SourceNode: stor1, - TargetNode: stor2, - State: JobStatePending, + Change: UpdateRepo, + ID: 1, + RelativePath: repo1Repository.RelativePath, + SourceNode: stor1, + TargetNode: stor2, + State: JobStatePending, + CorrelationID: correlationID, } require.Equal(t, expectedJob, jobs[0]) }, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 16094ac6c..97b47cfb9 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -27,6 +27,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -288,6 +289,8 @@ func listenAvailPort(tb testing.TB) (net.Listener, int) { func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { opts := []grpc.DialOption{ grpc.WithBlock(), + grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()), + grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()), } if backend { opts = append( diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index d45affbfe..3cffa8abc 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) @@ -86,6 +87,8 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) conn, err := client.Dial(node.Address, append( []grpc.DialOption{ + grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()), + grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()), grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 49af0be3d..fb8018138 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -41,6 +42,11 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC) + if job.CorrelationID == "" { + return fmt.Errorf("replication job %d missing correlation ID", job.ID) + } + ctx = correlation.ContextWithCorrelation(ctx, job.CorrelationID) + if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ Source: sourceRepository, Repository: targetRepository, diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 24b10b051..aee29234f 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" @@ -103,6 +104,7 @@ func TestProcessReplicationJob(t *testing.T) { defer cancel() injectedCtx := metadata.NewOutgoingContext(ctx, testhelper.GitalyServersMetadata(t, srvSocketPath)) + injectedCtx = correlation.ContextWithCorrelation(injectedCtx, "1") repoClient, con := newRepositoryClient(t, srvSocketPath) defer con.Close() @@ -122,7 +124,7 @@ func TestProcessReplicationJob(t *testing.T) { for _, secondary := range secondaries { secondaryStorages = append(secondaryStorages, secondary.Storage) } - _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo) + _, err = ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo) require.NoError(t, err) jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1) @@ -268,7 +270,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { ) ds := datastore.NewInMemory(config) - ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) + ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) require.NoError(t, err) require.Len(t, ids, 1) @@ -367,7 +369,7 @@ func TestProcessBacklog_Success(t *testing.T) { ) ds := datastore.NewInMemory(config) - ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) + ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) require.NoError(t, err) require.Len(t, ids, 1) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 23fb06f9c..0f58d6c05 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -24,6 +24,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "gitlab.com/gitlab-org/labkit/correlation" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) @@ -415,9 +416,12 @@ func TestRepoRemoval(t *testing.T) { rClient := gitalypb.NewRepositoryServiceClient(cc) - _, err := rClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ - Repository: &virtualRepo, - }) + _, err := rClient.RemoveRepository( + correlation.ContextWithCorrelation(ctx, "1"), + &gitalypb.RemoveRepositoryRequest{ + Repository: &virtualRepo, + }, + ) require.NoError(t, err) resp, err := rClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ |