Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-02-27 02:21:15 +0300
committerJohn Cai <jcai@gitlab.com>2020-02-27 02:21:15 +0300
commit45b3c26b84470c2c682351e972f7c172c3a2eb47 (patch)
tree246b5b538a94d5bbf0832fd54cd697b0688839ac
parent3880a6a5174cc669f91a2e579b50cf0fbc173059 (diff)
parent1b2ae2676a2f70b4cd8aa6df61a4c876f5abec00 (diff)
Merge branch 'po-revert-mr-1833' into 'master'
Revert "Merge branch 'po-praefect-correlation-replication' into 'master'" See merge request gitlab-org/gitaly!1868
-rw-r--r--changelogs/unreleased/po-praefect-correlation-replication.yml5
-rw-r--r--internal/praefect/coordinator.go8
-rw-r--r--internal/praefect/coordinator_test.go16
-rw-r--r--internal/praefect/datastore/datastore.go25
-rw-r--r--internal/praefect/datastore/datastore_test.go18
-rw-r--r--internal/praefect/helper_test.go3
-rw-r--r--internal/praefect/nodes/manager.go3
-rw-r--r--internal/praefect/replicator.go6
-rw-r--r--internal/praefect/replicator_test.go8
-rw-r--r--internal/praefect/server_test.go10
10 files changed, 33 insertions, 69 deletions
diff --git a/changelogs/unreleased/po-praefect-correlation-replication.yml b/changelogs/unreleased/po-praefect-correlation-replication.yml
deleted file mode 100644
index 5fe31cf50..000000000
--- a/changelogs/unreleased/po-praefect-correlation-replication.yml
+++ /dev/null
@@ -1,5 +0,0 @@
----
-title: Add correlation ID to Praefect replication jobs
-merge_request: 1833
-author:
-type: other
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 8df81bdc5..45b0dd841 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -16,7 +16,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -98,7 +97,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot
return nil, err
}
- if requestFinalizer, err = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change); err != nil {
+ if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
return nil, err
}
}
@@ -189,13 +188,12 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(ctx context.Context, targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) {
+func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) {
var secondaryStorages []string
for _, secondary := range secondaries {
secondaryStorages = append(secondaryStorages, secondary.GetStorage())
}
-
- jobIDs, err := c.datastore.CreateReplicaReplJobs(correlation.ExtractFromContext(ctx), targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change)
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 2018e4112..dccab79f0 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -14,7 +14,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "gitlab.com/gitlab-org/labkit/correlation"
)
var testLogger = logrus.New()
@@ -75,7 +74,7 @@ func TestStreamDirector(t *testing.T) {
fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool"
peeker := &mockPeeker{frame}
- streamParams, err := coordinator.streamDirector(correlation.ContextWithCorrelation(ctx, "1"), fullMethod, peeker)
+ streamParams, err := coordinator.streamDirector(ctx, fullMethod, peeker)
require.NoError(t, err)
require.Equal(t, address, streamParams.Conn().Target())
@@ -104,13 +103,12 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
expectedJob := datastore.ReplJob{
- Change: datastore.UpdateRepo,
- ID: 1,
- TargetNode: targetNode,
- SourceNode: sourceNode,
- State: datastore.JobStatePending,
- RelativePath: targetRepo.RelativePath,
- CorrelationID: "1",
+ Change: datastore.UpdateRepo,
+ ID: 1,
+ TargetNode: targetNode,
+ SourceNode: sourceNode,
+ State: datastore.JobStatePending,
+ RelativePath: targetRepo.RelativePath,
}
require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 92cda155e..5896e7797 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -63,7 +63,6 @@ type ReplJob struct {
RelativePath string // source for replication
State JobState
Attempts int
- CorrelationID string // from original request
}
// replJobs provides sort manipulation behavior
@@ -109,7 +108,7 @@ type ReplJobsDatastore interface {
// CreateReplicaReplJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error)
// UpdateReplJobState updates the state of an existing replication job
UpdateReplJobState(jobID uint64, newState JobState) error
@@ -123,7 +122,6 @@ type jobRecord struct {
targetNodeStorage, sourceNodeStorage string
state JobState
attempts int
- correlationID string // from original request
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
@@ -288,14 +286,13 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
}
return ReplJob{
- Change: record.change,
- ID: jobID,
- RelativePath: record.relativePath,
- SourceNode: sourceNode,
- State: record.state,
- TargetNode: targetNode,
- Attempts: record.attempts,
- CorrelationID: record.correlationID,
+ Change: record.change,
+ ID: jobID,
+ RelativePath: record.relativePath,
+ SourceNode: sourceNode,
+ State: record.state,
+ TargetNode: targetNode,
+ Attempts: record.attempts,
}, nil
}
@@ -305,16 +302,13 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(correlationID string, relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
if relativePath == "" {
return nil, errors.New("invalid source repository")
}
- if correlationID == "" {
- return nil, errors.New("invalid correlation ID")
- }
var jobIDs []uint64
@@ -327,7 +321,6 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(correlationID string, relativeP
state: JobStatePending,
relativePath: relativePath,
sourceNodeStorage: primaryStorage,
- correlationID: correlationID,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 1094f4186..0ee189123 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -20,8 +20,7 @@ var (
Address: "tcp://address-2",
Storage: "praefect-storage-2",
}
- proj1 = "abcd1234" // imagine this is a legit project hash
- correlationID = "1"
+ proj1 = "abcd1234" // imagine this is a legit project hash
)
var (
@@ -45,7 +44,7 @@ var operations = []struct {
{
desc: "insert replication job",
opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo)
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo)
require.NoError(t, err)
},
},
@@ -57,13 +56,12 @@ var operations = []struct {
require.Len(t, jobs, 1)
expectedJob := ReplJob{
- Change: UpdateRepo,
- ID: 1,
- RelativePath: repo1Repository.RelativePath,
- SourceNode: stor1,
- TargetNode: stor2,
- State: JobStatePending,
- CorrelationID: correlationID,
+ Change: UpdateRepo,
+ ID: 1,
+ RelativePath: repo1Repository.RelativePath,
+ SourceNode: stor1,
+ TargetNode: stor2,
+ State: JobStatePending,
}
require.Equal(t, expectedJob, jobs[0])
},
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 97b47cfb9..16094ac6c 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -27,7 +27,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
@@ -289,8 +288,6 @@ func listenAvailPort(tb testing.TB) (net.Listener, int) {
func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
opts := []grpc.DialOption{
grpc.WithBlock(),
- grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()),
- grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()),
}
if backend {
opts = append(
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 3cffa8abc..d45affbfe 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -13,7 +13,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
- correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -87,8 +86,6 @@ func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption)
conn, err := client.Dial(node.Address,
append(
[]grpc.DialOption{
- grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()),
- grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()),
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index fb8018138..49af0be3d 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -12,7 +12,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "gitlab.com/gitlab-org/labkit/correlation"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
@@ -42,11 +41,6 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
- if job.CorrelationID == "" {
- return fmt.Errorf("replication job %d missing correlation ID", job.ID)
- }
- ctx = correlation.ContextWithCorrelation(ctx, job.CorrelationID)
-
if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
Source: sourceRepository,
Repository: targetRepository,
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index aee29234f..24b10b051 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -25,7 +25,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
@@ -104,7 +103,6 @@ func TestProcessReplicationJob(t *testing.T) {
defer cancel()
injectedCtx := metadata.NewOutgoingContext(ctx, testhelper.GitalyServersMetadata(t, srvSocketPath))
- injectedCtx = correlation.ContextWithCorrelation(injectedCtx, "1")
repoClient, con := newRepositoryClient(t, srvSocketPath)
defer con.Close()
@@ -124,7 +122,7 @@ func TestProcessReplicationJob(t *testing.T) {
for _, secondary := range secondaries {
secondaryStorages = append(secondaryStorages, secondary.Storage)
}
- _, err = ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo)
+ _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo)
require.NoError(t, err)
jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)
@@ -270,7 +268,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
)
ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
+ ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
require.NoError(t, err)
require.Len(t, ids, 1)
@@ -369,7 +367,7 @@ func TestProcessBacklog_Success(t *testing.T) {
)
ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs("1", testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
+ ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
require.NoError(t, err)
require.Len(t, ids, 1)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 0f58d6c05..23fb06f9c 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -24,7 +24,6 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "gitlab.com/gitlab-org/labkit/correlation"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -416,12 +415,9 @@ func TestRepoRemoval(t *testing.T) {
rClient := gitalypb.NewRepositoryServiceClient(cc)
- _, err := rClient.RemoveRepository(
- correlation.ContextWithCorrelation(ctx, "1"),
- &gitalypb.RemoveRepositoryRequest{
- Repository: &virtualRepo,
- },
- )
+ _, err := rClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ Repository: &virtualRepo,
+ })
require.NoError(t, err)
resp, err := rClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{