diff options
author | John Cai <jcai@gitlab.com> | 2019-08-22 21:53:18 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-08-22 21:53:18 +0300 |
commit | c06f35897d1e4c38535706d7965db399eafcd736 (patch) | |
tree | 738770e21dd544f00e79621eaf5a5a404486a01f | |
parent | c04c53fb54a78d9f81590be7fcec0d7bade9997b (diff) | |
parent | c0be0c9b6d1c92b8150d363d22cea261ea762708 (diff) |
Merge branch 'jc-realtime-replication' into 'master'
Praefect Realtime replication
Closes #1832
See merge request gitlab-org/gitaly!1423
-rw-r--r-- | changelogs/unreleased/jc-realtime-replication.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | config.praefect.toml.example | 2 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 4 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 1 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 182 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 87 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 32 | ||||
-rw-r--r-- | internal/praefect/datastore_memory_test.go | 97 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/director.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/examples_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 8 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 112 |
17 files changed, 244 insertions, 317 deletions
diff --git a/changelogs/unreleased/jc-realtime-replication.yml b/changelogs/unreleased/jc-realtime-replication.yml new file mode 100644 index 000000000..555f74e71 --- /dev/null +++ b/changelogs/unreleased/jc-realtime-replication.yml @@ -0,0 +1,5 @@ +--- +title: Praefect Realtime replication +merge_request: 1423 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 7119b864f..cf7d1b8ab 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -97,7 +97,7 @@ func run(listeners []net.Listener, conf config.Config) error { // top level server dependencies datastore = praefect.NewMemoryDatastore(conf) coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...) - repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) + repl = praefect.NewReplMgr("default", logger, datastore, coordinator) srv = praefect.NewServer(coordinator, repl, nil, logger) // signal related signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} diff --git a/config.praefect.toml.example b/config.praefect.toml.example index 1393de890..dcd4194ac 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -5,8 +5,6 @@ listen_addr = "127.0.0.1:2305" # # Praefect can listen on a socket when placed on the same machine as all clients # socket_path = "/home/git/gitlab/tmp/sockets/private/praefect.socket" -# # Praefect will only replicate whitelisted repositories -# whitelist = ["@hashed/3f/db/3fdba35f04dc8c462986c992bcf875546257113072a909c162f7e470e581e278.git"] # # Optional: export metrics via Prometheus # prometheus_listen_addr = "127.0.01:10101" diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 121d2cdbe..5499d30f5 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -17,10 +17,6 @@ type Config struct { Nodes []*models.Node `toml:"node"` - // Whitelist is a list of relative project paths (paths comprised of project - // hashes) that are permitted to use high availability features - Whitelist []string `toml:"whitelist"` - Logging config.Logging `toml:"logging"` PrometheusListenAddr string `toml:"prometheus_listen_addr"` } diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 19df6ce11..1ed2a39fc 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -77,7 +77,6 @@ func TestConfigParsing(t *testing.T) { Storage: "praefect-internal-3", }, }, - Whitelist: []string{"abcd1234", "edfg5678"}, }, }, } diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 03d0f66f3..d2cb28396 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -1,6 +1,5 @@ listen_addr = "" socket_path = "" -whitelist = ["abcd1234", "edfg5678"] prometheus_listen_addr = "" [logging] diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index bee103b7c..0c80bd398 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,19 +4,20 @@ import ( "context" "errors" "fmt" - "math/rand" "os" "os/signal" + "sort" "sync" "syscall" - "time" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/client" @@ -32,14 +33,14 @@ type Coordinator struct { failoverMutex sync.RWMutex connMutex sync.RWMutex - datastore ReplicasDatastore + datastore Datastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -57,7 +58,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) @@ -65,86 +66,161 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, c.failoverMutex.RLock() defer c.failoverMutex.RUnlock() - frame, err := peeker.Peek() + mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - mi, err := c.registry.LookupMethod(fullMethodName) + m, err := protoMessageFromPeeker(mi, peeker) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - var primary *models.Node + var requestFinalizer func() + var storage string if mi.Scope == protoregistry.ScopeRepository { - m, err := mi.UnmarshalRequestProto(frame) + storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - - targetRepo, err := mi.TargetRepo(m) + } else { + storage, requestFinalizer, err = c.getAnyStorageNode() if err != nil { - return nil, nil, err + return nil, nil, nil, err } + } + // We only need the primary node, as there's only one primary storage + // location per praefect at this time + cc, err := c.GetConnection(storage) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage) + } + + return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil +} - primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) +var noopRequestFinalizer = func() {} - if err != nil { - if err != ErrPrimaryNotSet { - return nil, nil, err - } - // if there are no primaries for this repository, pick one - nodes, err := c.datastore.GetStorageNodes() - if err != nil { - return nil, nil, err - } +func (c *Coordinator) getAnyStorageNode() (string, func(), error) { + //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to + // proxy requests that are not repository scoped + node, err := c.datastore.GetStorageNodes() + if err != nil { + return "", nil, err + } + if len(node) == 0 { + return "", nil, errors.New("no node storages found") + } - if len(nodes) == 0 { - return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + return node[0].Storage, noopRequestFinalizer, nil +} - } - newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))] +func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) { + targetRepo, err := mi.TargetRepo(m) + if err != nil { + return "", nil, err + } - // set the primary - if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { - return nil, nil, err - } + primary, err := c.selectPrimary(mi, targetRepo) + if err != nil { + return "", nil, err + } + + targetRepo.StorageName = primary.Storage + + b, err := proxy.Codec().Marshal(m) + if err != nil { + return "", nil, err + } + + if err = peeker.Modify(b); err != nil { + return "", nil, err + } - primary = &newPrimary + requestFinalizer := noopRequestFinalizer + + if mi.Operation == protoregistry.OpMutator { + if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil { + return "", nil, err } + } + + return primary.Storage, requestFinalizer, nil +} - targetRepo.StorageName = primary.Storage +func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) { + var primary *models.Node + var err error + + primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) - b, err := proxy.Codec().Marshal(m) + if err != nil { + if err != ErrPrimaryNotSet { + return nil, err + } + // if there are no primaries for this repository, pick one + nodes, err := c.datastore.GetStorageNodes() if err != nil { - return nil, nil, err + return nil, err } - if err = peeker.Modify(b); err != nil { - return nil, nil, err + + if len(nodes) == 0 { + return nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + } - } else { - //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to - // proxy requests that are not repository scoped - node, err := c.datastore.GetStorageNodes() - if err != nil { - return nil, nil, err + sort.Slice(nodes, func(i, j int) bool { + return nodes[i].ID < nodes[j].ID + }) + + newPrimary := nodes[0] + replicas := nodes[1:] + + // set the primary + if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { + return nil, err } - if len(node) == 0 { - return nil, nil, errors.New("no node storages found") + + // add replicas + for _, replica := range replicas { + if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.ID); err != nil { + return nil, err + } } - primary = &node[0] + + primary = &newPrimary } - // We only need the primary node, as there's only one primary storage - // location per praefect at this time - cc, err := c.GetConnection(primary.Storage) + return primary, nil +} + +func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) { + frame, err := peeker.Peek() if err != nil { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) + return nil, err } - return helper.IncomingToOutgoing(ctx), cc, nil + m, err := mi.UnmarshalRequestProto(frame) + if err != nil { + return nil, err + } + + return m, nil +} + +func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func(), error) { + jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath) + if err != nil { + return nil, err + } + return func() { + for _, jobID := range jobIDs { + if err := c.datastore.UpdateReplJob(jobID, JobStateReady); err != nil { + c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", JobStateReady) + } + } + }, nil } // RegisterNode will direct traffic to the supplied downstream connection when the storage location diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 0275c6048..e029a3f48 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -4,7 +4,16 @@ import ( "io/ioutil" "testing" + "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" ) var testLogger = logrus.New() @@ -16,3 +25,81 @@ func init() { func TestSecondaryRotation(t *testing.T) { t.Skip("secondary rotation will change with the new data model") } + +func TestStreamDirector(t *testing.T) { + datastore := NewMemoryDatastore(config.Config{ + Nodes: []*models.Node{ + &models.Node{ + Address: "tcp://gitaly-primary.example.com", + Storage: "praefect-internal-1", + }, &models.Node{ + Address: "tcp://gitaly-backup1.example.com", + Storage: "praefect-internal-2", + }}, + }) + + targetRepo := gitalypb.Repository{ + StorageName: "praefect", + RelativePath: "/path/to/hashed/storage", + } + + ctx, cancel := testhelper.Context() + defer cancel() + + coordinator := NewCoordinator(log.Default(), datastore) + require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) + + frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{ + Repository: &targetRepo, + }) + require.NoError(t, err) + + cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure()) + require.NoError(t, err) + + coordinator.setConn("praefect-internal-1", cc) + + _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame}) + require.NoError(t, err) + require.Equal(t, cc, conn, "stream director should choose the primary as the client connection to use") + + jobs, err := datastore.GetJobs(JobStatePending, 1, 10) + require.NoError(t, err) + require.Len(t, jobs, 1) + + targetNode, err := datastore.GetStorageNode(1) + require.NoError(t, err) + sourceNode, err := datastore.GetStorageNode(0) + require.NoError(t, err) + + expectedJob := ReplJob{ + ID: 1, + TargetNode: targetNode, + SourceNode: sourceNode, + State: JobStatePending, + Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}}, + } + + require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct") + + jobUpdateFunc() + + jobs, err = coordinator.datastore.GetJobs(JobStateReady, 1, 10) + require.NoError(t, err) + require.Len(t, jobs, 1) + + expectedJob.State = JobStateReady + require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady") +} + +type mockPeeker struct { + frame []byte +} + +func (m *mockPeeker) Peek() ([]byte, error) { + return m.frame, nil +} + +func (m *mockPeeker) Modify(payload []byte) error { + return nil +} diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index f9787089f..f61a64064 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -115,7 +115,6 @@ type jobRecord struct { type MemoryDatastore struct { jobs *struct { sync.RWMutex - next uint64 records map[uint64]jobRecord // all jobs indexed by ID } @@ -141,10 +140,8 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { }, jobs: &struct { sync.RWMutex - next uint64 records map[uint64]jobRecord // all jobs indexed by ID }{ - next: 0, records: map[uint64]jobRecord{}, }, repositories: &struct { @@ -160,32 +157,6 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { m.storageNodes.m[i] = *storageNode } - for _, repoPath := range cfg.Whitelist { - repo := models.Repository{ - RelativePath: repoPath, - } - for storageID, storageNode := range cfg.Nodes { - - // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node - // to be the primary - if repo.Primary == (models.Node{}) { - repo.Primary = *storageNode - } else { - repo.Replicas = append(repo.Replicas, *storageNode) - // initialize replication job queue to replicate all whitelisted repos - // to every replica - m.jobs.next++ - m.jobs.records[m.jobs.next] = jobRecord{ - state: JobStateReady, - targetNodeID: storageID, - sourceNodeID: repo.Primary.ID, - relativePath: repoPath, - } - } - } - m.repositories.m[repoPath] = repo - } - return m } @@ -409,8 +380,7 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, for _, secondary := range repository.Replicas { nextID := uint64(len(md.jobs.records) + 1) - md.jobs.next++ - md.jobs.records[md.jobs.next] = jobRecord{ + md.jobs.records[nextID] = jobRecord{ targetNodeID: secondary.ID, state: JobStatePending, relativePath: relativePath, diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go deleted file mode 100644 index 0881b0008..000000000 --- a/internal/praefect/datastore_memory_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package praefect - -import ( - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" -) - -// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will -// populate itself with the correct replication jobs and repositories when initialized -// with a configuration file specifying the shard and whitelisted repositories. -func TestMemoryDatastoreWhitelist(t *testing.T) { - t.Skip("Since we are getting rid of the whitelist, we can skip this test for now. We can remove it once we get rid of the whitelist") - repo1 := models.Repository{ - RelativePath: "abcd1234", - } - repo2 := models.Repository{ - RelativePath: "5678efgh", - } - mds := NewMemoryDatastore(config.Config{ - Nodes: []*models.Node{ - &models.Node{ - ID: 0, - Address: "tcp://default", - Storage: "praefect-internal-1", - }, - &models.Node{ - ID: 1, - Address: "tcp://backup-2", - Storage: "praefect-internal-2", - }, &models.Node{ - ID: 2, - Address: "tcp://backup-2", - Storage: "praefect-internal-3", - }}, - Whitelist: []string{repo1.RelativePath, repo2.RelativePath}, - }) - - expectReplicas := []models.Node{ - mds.storageNodes.m[1], - mds.storageNodes.m[2], - } - - for _, repo := range []models.Repository{repo1, repo2} { - actualReplicas, err := mds.GetReplicas(repo.RelativePath) - require.NoError(t, err) - require.ElementsMatch(t, expectReplicas, actualReplicas) - } - - primary := mds.storageNodes.m[0] - backup1 := mds.storageNodes.m[1] - backup2 := mds.storageNodes.m[2] - - backup1ExpectedJobs := []ReplJob{ - ReplJob{ - ID: 1, - TargetNode: backup1, - Repository: models.Repository{RelativePath: repo1.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}}, - SourceNode: primary, - State: JobStateReady, - }, - ReplJob{ - ID: 3, - TargetNode: backup1, - Repository: models.Repository{RelativePath: repo2.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}}, - SourceNode: primary, - State: JobStateReady, - }, - } - backup2ExpectedJobs := []ReplJob{ - ReplJob{ - ID: 2, - TargetNode: backup2, - Repository: models.Repository{RelativePath: repo1.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}}, - SourceNode: primary, - State: JobStateReady, - }, - ReplJob{ - ID: 4, - TargetNode: backup2, - Repository: models.Repository{RelativePath: repo2.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}}, - SourceNode: primary, - State: JobStateReady, - }, - } - - backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10) - require.NoError(t, err) - require.Equal(t, backup1ExpectedJobs, backup1ActualJobs) - - backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10) - require.NoError(t, err) - require.Equal(t, backup2ActualJobs, backup2ExpectedJobs) - -} diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 10a63b228..82712765f 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -21,4 +21,4 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 2c2090363..6d4a3238e 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -39,10 +39,10 @@ func ExampleTransparentHandler() { // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) // Copy the inbound metadata explicitly. @@ -53,12 +53,12 @@ func ExampleStreamDirector() { if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + return outCtx, conn, nil, err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + return outCtx, conn, nil, err } } - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index daf12d4b1..0ed1b3b5e 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -71,11 +71,17 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error peeker := newPeeker(serverStream) // We require that the director's returned context inherits from the serverStream.Context(). - outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, peeker) + outgoingCtx, backendConn, requestFinalizer, err := s.director(serverStream.Context(), fullMethodName, peeker) if err != nil { return err } + defer func() { + if requestFinalizer != nil { + requestFinalizer() + } + }() + clientCtx, clientCancel := context.WithCancel(outgoingCtx) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index 0fff36ed4..c57837d2a 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -207,17 +207,17 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return ctx, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection") + return ctx, nil, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection") } } // Explicitly copy the metadata, otherwise the tests will fail. outCtx, _ := context.WithCancel(ctx) outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) - return outCtx, s.serverClientConn, nil + return outCtx, s.serverClientConn, nil, nil } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index e274f31e2..a31e7af34 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -28,7 +28,7 @@ func TestStreamPeeking(t *testing.T) { pingReqSent := &testservice.PingRequest{Value: "hi"} // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { t.Logf("director routing method %s to backend", fullMethodName) peekedMsg, err := peeker.Peek() @@ -39,7 +39,7 @@ func TestStreamPeeking(t *testing.T) { require.NoError(t, err) require.Equal(t, pingReqSent, peekedRequest) - return ctx, backendCC, nil + return ctx, backendCC, nil, nil } pingResp := &testservice.PingResponse{ @@ -87,7 +87,7 @@ func TestStreamInjecting(t *testing.T) { newValue := "bye" // director will peek into stream and change some frames - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { t.Logf("modifying request for method %s", fullMethodName) peekedMsg, err := peeker.Peek() @@ -104,7 +104,7 @@ func TestStreamInjecting(t *testing.T) { require.NoError(t, peeker.Modify(newPayload)) - return ctx, backendCC, nil + return ctx, backendCC, nil, nil } pingResp := &testservice.PingResponse{ diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index b865edfcb..f54a6f457 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -147,7 +147,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { } for _, node := range nodes { - jobs, err := r.datastore.GetJobs(JobStatePending|JobStateReady, node.ID, 10) + jobs, err := r.datastore.GetJobs(JobStateReady, node.ID, 10) if err != nil { return err } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index f035d925e..45c580a54 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -1,137 +1,25 @@ package praefect import ( - "context" "io/ioutil" "log" "net" "os" "path/filepath" "testing" - "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" - gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) -// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for -// all whitelisted repos -func TestReplicatorProcessJobsWhitelist(t *testing.T) { - datastore := NewMemoryDatastore(config.Config{ - Nodes: []*models.Node{ - &models.Node{ - ID: 1, - Address: "tcp://gitaly-primary.example.com", - Storage: "praefect-internal-1", - }, &models.Node{ - ID: 2, - Address: "tcp://gitaly-backup1.example.com", - Storage: "praefect-internal-2", - }}, - Whitelist: []string{"abcd1234", "edfg5678"}, - }) - - logEntry := gitalylog.Default() - coordinator := NewCoordinator(logEntry, datastore) - resultsCh := make(chan result) - replman := NewReplMgr( - "default", - logEntry, - datastore, - coordinator, - WithReplicator(&mockReplicator{resultsCh}), - ) - - for _, node := range datastore.storageNodes.m { - err := coordinator.RegisterNode(node.Storage, node.Address) - require.NoError(t, err) - } - - ctx, cancel := testhelper.Context() - - errQ := make(chan error) - - go func() { - errQ <- replman.ProcessBacklog(ctx) - }() - - success := make(chan struct{}) - - var expectedResults []result - // we expect one job per whitelisted repo with each backend server - for _, shard := range datastore.repositories.m { - for _, secondary := range shard.Replicas { - cc, err := coordinator.GetConnection(secondary.Storage) - require.NoError(t, err) - expectedResults = append(expectedResults, - result{relativePath: shard.RelativePath, - targetStorage: secondary.Storage, - targetCC: cc, - }) - } - } - - go func() { - // we expect one job per whitelisted repo with each backend server - for _, shard := range datastore.repositories.m { - for range shard.Replicas { - result := <-resultsCh - assert.Contains(t, expectedResults, result) - } - } - - cancel() - require.EqualError(t, <-errQ, context.Canceled.Error()) - success <- struct{}{} - }() - - select { - - case <-success: - return - - case <-time.After(time.Second): - t.Fatalf("unable to iterate over expected jobs") - - } - -} - -type result struct { - relativePath string - targetStorage string - targetCC *grpc.ClientConn -} - -type mockReplicator struct { - resultsCh chan<- result -} - -func (mr *mockReplicator) Replicate(ctx context.Context, job ReplJob, target *grpc.ClientConn) error { - select { - - case mr.resultsCh <- result{job.Repository.RelativePath, job.TargetNode.Storage, target}: - return nil - - case <-ctx.Done(): - return ctx.Err() - - } - - return nil -} - func TestReplicate(t *testing.T) { srv, srvSocketPath := runFullGitalyServer(t) defer srv.Stop() |