diff options
author | John Cai <jcai@gitlab.com> | 2020-02-27 02:21:15 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-02-27 02:21:15 +0300 |
commit | 45b3c26b84470c2c682351e972f7c172c3a2eb47 (patch) | |
tree | 246b5b538a94d5bbf0832fd54cd697b0688839ac | |
parent | 3880a6a5174cc669f91a2e579b50cf0fbc173059 (diff) | |
parent | 1b2ae2676a2f70b4cd8aa6df61a4c876f5abec00 (diff) |
Merge branch 'po-revert-mr-1833' into 'master'
Revert "Merge branch 'po-praefect-correlation-replication' into 'master'"
See merge request gitlab-org/gitaly!1868
-rw-r--r-- | changelogs/unreleased/po-praefect-correlation-replication.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 8 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 25 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 3 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 6 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 10 |
10 files changed, 33 insertions, 69 deletions
diff --git a/changelogs/unreleased/po-praefect-correlation-replication.yml b/changelogs/unreleased/po-praefect-correlation-replication.yml deleted file mode 100644 index 5fe31cf50..000000000 --- a/changelogs/unreleased/po-praefect-correlation-replication.yml +++ /dev/null @@ -1,5 +0,0 @@ ---- -title: Add correlation ID to Praefect replication jobs -merge_request: 1833 -author: -type: other diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 8df81bdc5..45b0dd841 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -16,7 +16,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -98,7 +97,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot return nil, err } - if requestFinalizer, err = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change); err != nil { + if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil { return nil, err } } @@ -189,13 +188,12 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi return m, nil } -func (c *Coordinator) createReplicaJobs(ctx context.Context, targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) { +func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) { var secondaryStorages []string for _, secondary := range secondaries { secondaryStorages = append(secondaryStorages, secondary.GetStorage()) } - - jobIDs, err := c.datastore.CreateReplicaReplJobs(correlation.ExtractFromContext(ctx), targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change) + jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change) if err != nil { return nil, err } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 2018e4112..dccab79f0 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -14,7 +14,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "gitlab.com/gitlab-org/labkit/correlation" ) var testLogger = logrus.New() @@ -75,7 +74,7 @@ func TestStreamDirector(t *testing.T) { fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool" peeker := &mockPeeker{frame} - streamParams, err := coordinator.streamDirector(correlation.ContextWithCorrelation(ctx, "1"), fullMethod, peeker) + streamParams, err := coordinator.streamDirector(ctx, fullMethod, peeker) require.NoError(t, err) require.Equal(t, address, streamParams.Conn().Target()) @@ -104,13 +103,12 @@ func TestStreamDirector(t *testing.T) { require.NoError(t, err) expectedJob := datastore.ReplJob{ - Change: datastore.UpdateRepo, - ID: 1, - TargetNode: targetNode, - SourceNode: sourceNode, - State: datastore.JobStatePending, - RelativePath: targetRepo.RelativePath, - CorrelationID: "1", + Change: datastore.UpdateRepo, + ID: 1, + TargetNode: targetNode, + SourceNode: sourceNode, + State: datastore.JobStatePending, + RelativePath: targetRepo.RelativePath, } require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct") diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 92cda155e..5896e7797 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -63,7 +63,6 @@ type ReplJob struct { RelativePath string // source for replication State JobState Attempts int - CorrelationID string // from original request } // replJobs provides sort manipulation behavior @@ -109,7 +108,7 @@ type ReplJobsDatastore interface { // CreateReplicaReplJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) + CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) // UpdateReplJobState updates the state of an existing replication job UpdateReplJobState(jobID uint64, newState JobState) error @@ -123,7 +122,6 @@ type jobRecord struct { targetNodeStorage, sourceNodeStorage string state JobState attempts int - correlationID string // from original request } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is @@ -288,14 +286,13 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re } return ReplJob{ - Change: record.change, - ID: jobID, - RelativePath: record.relativePath, - SourceNode: sourceNode, - State: record.state, - TargetNode: targetNode, - Attempts: record.attempts, - CorrelationID: record.correlationID, + Change: record.change, + ID: jobID, + RelativePath: record.relativePath, + SourceNode: sourceNode, + State: record.state, + TargetNode: targetNode, + Attempts: record.attempts, }, nil } @@ -305,16 +302,13 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi // CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() if relativePath == "" { return nil, errors.New("invalid source repository") } - if correlationID == "" { - return nil, errors.New("invalid correlation ID") - } var jobIDs []uint64 @@ -327,7 +321,6 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(correlationID string, relativeP state: JobStatePending, relativePath: relativePath, sourceNodeStorage: primaryStorage, - correlationID: correlationID, } jobIDs = append(jobIDs, nextID) diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go index 1094f4186..0ee189123 100644 --- a/internal/praefect/datastore/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -20,8 +20,7 @@ var ( Address: "tcp://address-2", Storage: "praefect-storage-2", } - proj1 = "abcd1234" // imagine this is a legit project hash - correlationID = "1" + proj1 = "abcd1234" // imagine this is a legit project hash ) var ( @@ -45,7 +44,7 @@ var operations = []struct { { desc: "insert replication job", opFn: func(t *testing.T, ds Datastore) { - _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo) + _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo) require.NoError(t, err) }, }, @@ -57,13 +56,12 @@ var operations = []struct { require.Len(t, jobs, 1) expectedJob := ReplJob{ - Change: UpdateRepo, - ID: 1, - RelativePath: repo1Repository.RelativePath, - SourceNode: stor1, - TargetNode: stor2, - State: JobStatePending, - CorrelationID: correlationID, + Change: UpdateRepo, + ID: 1, + RelativePath: repo1Repository.RelativePath, + SourceNode: stor1, + TargetNode: stor2, + State: JobStatePending, } require.Equal(t, expectedJob, jobs[0]) }, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 97b47cfb9..16094ac6c 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -27,7 +27,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" @@ -289,8 +288,6 @@ func listenAvailPort(tb testing.TB) (net.Listener, int) { func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { opts := []grpc.DialOption{ grpc.WithBlock(), - grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()), - grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()), } if backend { opts = append( diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index 3cffa8abc..d45affbfe 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -13,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" - correlation "gitlab.com/gitlab-org/labkit/correlation/grpc" "google.golang.org/grpc" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) @@ -87,8 +86,6 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) conn, err := client.Dial(node.Address, append( []grpc.DialOption{ - grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()), - grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()), grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)), grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index fb8018138..49af0be3d 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -12,7 +12,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "gitlab.com/gitlab-org/labkit/correlation" "golang.org/x/sync/errgroup" "google.golang.org/grpc" ) @@ -42,11 +41,6 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC) - if job.CorrelationID == "" { - return fmt.Errorf("replication job %d missing correlation ID", job.ID) - } - ctx = correlation.ContextWithCorrelation(ctx, job.CorrelationID) - if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{ Source: sourceRepository, Repository: targetRepository, diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index aee29234f..24b10b051 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -25,7 +25,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" @@ -104,7 +103,6 @@ func TestProcessReplicationJob(t *testing.T) { defer cancel() injectedCtx := metadata.NewOutgoingContext(ctx, testhelper.GitalyServersMetadata(t, srvSocketPath)) - injectedCtx = correlation.ContextWithCorrelation(injectedCtx, "1") repoClient, con := newRepositoryClient(t, srvSocketPath) defer con.Close() @@ -124,7 +122,7 @@ func TestProcessReplicationJob(t *testing.T) { for _, secondary := range secondaries { secondaryStorages = append(secondaryStorages, secondary.Storage) } - _, err = ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo) + _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo) require.NoError(t, err) jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1) @@ -270,7 +268,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { ) ds := datastore.NewInMemory(config) - ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) + ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) require.NoError(t, err) require.Len(t, ids, 1) @@ -369,7 +367,7 @@ func TestProcessBacklog_Success(t *testing.T) { ) ds := datastore.NewInMemory(config) - ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) + ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) require.NoError(t, err) require.Len(t, ids, 1) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 0f58d6c05..23fb06f9c 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -24,7 +24,6 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "gitlab.com/gitlab-org/labkit/correlation" healthpb "google.golang.org/grpc/health/grpc_health_v1" ) @@ -416,12 +415,9 @@ func TestRepoRemoval(t *testing.T) { rClient := gitalypb.NewRepositoryServiceClient(cc) - _, err := rClient.RemoveRepository( - correlation.ContextWithCorrelation(ctx, "1"), - &gitalypb.RemoveRepositoryRequest{ - Repository: &virtualRepo, - }, - ) + _, err := rClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ + Repository: &virtualRepo, + }) require.NoError(t, err) resp, err := rClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ |