Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-08-22 21:53:18 +0300
committerJohn Cai <jcai@gitlab.com>2019-08-22 21:53:18 +0300
commitc06f35897d1e4c38535706d7965db399eafcd736 (patch)
tree738770e21dd544f00e79621eaf5a5a404486a01f
parentc04c53fb54a78d9f81590be7fcec0d7bade9997b (diff)
parentc0be0c9b6d1c92b8150d363d22cea261ea762708 (diff)
Merge branch 'jc-realtime-replication' into 'master'
Praefect Realtime replication Closes #1832 See merge request gitlab-org/gitaly!1423
-rw-r--r--changelogs/unreleased/jc-realtime-replication.yml5
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--config.praefect.toml.example2
-rw-r--r--internal/praefect/config/config.go4
-rw-r--r--internal/praefect/config/config_test.go1
-rw-r--r--internal/praefect/config/testdata/config.toml1
-rw-r--r--internal/praefect/coordinator.go182
-rw-r--r--internal/praefect/coordinator_test.go87
-rw-r--r--internal/praefect/datastore.go32
-rw-r--r--internal/praefect/datastore_memory_test.go97
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/examples_test.go10
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go8
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go6
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker_test.go8
-rw-r--r--internal/praefect/replicator.go2
-rw-r--r--internal/praefect/replicator_test.go112
17 files changed, 244 insertions, 317 deletions
diff --git a/changelogs/unreleased/jc-realtime-replication.yml b/changelogs/unreleased/jc-realtime-replication.yml
new file mode 100644
index 000000000..555f74e71
--- /dev/null
+++ b/changelogs/unreleased/jc-realtime-replication.yml
@@ -0,0 +1,5 @@
+---
+title: Praefect Realtime replication
+merge_request: 1423
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 7119b864f..cf7d1b8ab 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -97,7 +97,7 @@ func run(listeners []net.Listener, conf config.Config) error {
// top level server dependencies
datastore = praefect.NewMemoryDatastore(conf)
coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...)
- repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
+ repl = praefect.NewReplMgr("default", logger, datastore, coordinator)
srv = praefect.NewServer(coordinator, repl, nil, logger)
// signal related
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
diff --git a/config.praefect.toml.example b/config.praefect.toml.example
index 1393de890..dcd4194ac 100644
--- a/config.praefect.toml.example
+++ b/config.praefect.toml.example
@@ -5,8 +5,6 @@ listen_addr = "127.0.0.1:2305"
# # Praefect can listen on a socket when placed on the same machine as all clients
# socket_path = "/home/git/gitlab/tmp/sockets/private/praefect.socket"
-# # Praefect will only replicate whitelisted repositories
-# whitelist = ["@hashed/3f/db/3fdba35f04dc8c462986c992bcf875546257113072a909c162f7e470e581e278.git"]
# # Optional: export metrics via Prometheus
# prometheus_listen_addr = "127.0.01:10101"
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 121d2cdbe..5499d30f5 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -17,10 +17,6 @@ type Config struct {
Nodes []*models.Node `toml:"node"`
- // Whitelist is a list of relative project paths (paths comprised of project
- // hashes) that are permitted to use high availability features
- Whitelist []string `toml:"whitelist"`
-
Logging config.Logging `toml:"logging"`
PrometheusListenAddr string `toml:"prometheus_listen_addr"`
}
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 19df6ce11..1ed2a39fc 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -77,7 +77,6 @@ func TestConfigParsing(t *testing.T) {
Storage: "praefect-internal-3",
},
},
- Whitelist: []string{"abcd1234", "edfg5678"},
},
},
}
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 03d0f66f3..d2cb28396 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -1,6 +1,5 @@
listen_addr = ""
socket_path = ""
-whitelist = ["abcd1234", "edfg5678"]
prometheus_listen_addr = ""
[logging]
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index bee103b7c..0c80bd398 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -4,19 +4,20 @@ import (
"context"
"errors"
"fmt"
- "math/rand"
"os"
"os/signal"
+ "sort"
"sync"
"syscall"
- "time"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/client"
@@ -32,14 +33,14 @@ type Coordinator struct {
failoverMutex sync.RWMutex
connMutex sync.RWMutex
- datastore ReplicasDatastore
+ datastore Datastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -57,7 +58,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
}
// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
@@ -65,86 +66,161 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
c.failoverMutex.RLock()
defer c.failoverMutex.RUnlock()
- frame, err := peeker.Peek()
+ mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
- mi, err := c.registry.LookupMethod(fullMethodName)
+ m, err := protoMessageFromPeeker(mi, peeker)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
- var primary *models.Node
+ var requestFinalizer func()
+ var storage string
if mi.Scope == protoregistry.ScopeRepository {
- m, err := mi.UnmarshalRequestProto(frame)
+ storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker)
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
-
- targetRepo, err := mi.TargetRepo(m)
+ } else {
+ storage, requestFinalizer, err = c.getAnyStorageNode()
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
+ }
+ // We only need the primary node, as there's only one primary storage
+ // location per praefect at this time
+ cc, err := c.GetConnection(storage)
+ if err != nil {
+ return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage)
+ }
+
+ return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil
+}
- primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
+var noopRequestFinalizer = func() {}
- if err != nil {
- if err != ErrPrimaryNotSet {
- return nil, nil, err
- }
- // if there are no primaries for this repository, pick one
- nodes, err := c.datastore.GetStorageNodes()
- if err != nil {
- return nil, nil, err
- }
+func (c *Coordinator) getAnyStorageNode() (string, func(), error) {
+ //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
+ // proxy requests that are not repository scoped
+ node, err := c.datastore.GetStorageNodes()
+ if err != nil {
+ return "", nil, err
+ }
+ if len(node) == 0 {
+ return "", nil, errors.New("no node storages found")
+ }
- if len(nodes) == 0 {
- return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
+ return node[0].Storage, noopRequestFinalizer, nil
+}
- }
- newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))]
+func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) {
+ targetRepo, err := mi.TargetRepo(m)
+ if err != nil {
+ return "", nil, err
+ }
- // set the primary
- if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
- return nil, nil, err
- }
+ primary, err := c.selectPrimary(mi, targetRepo)
+ if err != nil {
+ return "", nil, err
+ }
+
+ targetRepo.StorageName = primary.Storage
+
+ b, err := proxy.Codec().Marshal(m)
+ if err != nil {
+ return "", nil, err
+ }
+
+ if err = peeker.Modify(b); err != nil {
+ return "", nil, err
+ }
- primary = &newPrimary
+ requestFinalizer := noopRequestFinalizer
+
+ if mi.Operation == protoregistry.OpMutator {
+ if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil {
+ return "", nil, err
}
+ }
+
+ return primary.Storage, requestFinalizer, nil
+}
- targetRepo.StorageName = primary.Storage
+func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) {
+ var primary *models.Node
+ var err error
+
+ primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
- b, err := proxy.Codec().Marshal(m)
+ if err != nil {
+ if err != ErrPrimaryNotSet {
+ return nil, err
+ }
+ // if there are no primaries for this repository, pick one
+ nodes, err := c.datastore.GetStorageNodes()
if err != nil {
- return nil, nil, err
+ return nil, err
}
- if err = peeker.Modify(b); err != nil {
- return nil, nil, err
+
+ if len(nodes) == 0 {
+ return nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
+
}
- } else {
- //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
- // proxy requests that are not repository scoped
- node, err := c.datastore.GetStorageNodes()
- if err != nil {
- return nil, nil, err
+ sort.Slice(nodes, func(i, j int) bool {
+ return nodes[i].ID < nodes[j].ID
+ })
+
+ newPrimary := nodes[0]
+ replicas := nodes[1:]
+
+ // set the primary
+ if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
+ return nil, err
}
- if len(node) == 0 {
- return nil, nil, errors.New("no node storages found")
+
+ // add replicas
+ for _, replica := range replicas {
+ if err = c.datastore.AddReplica(targetRepo.GetRelativePath(), replica.ID); err != nil {
+ return nil, err
+ }
}
- primary = &node[0]
+
+ primary = &newPrimary
}
- // We only need the primary node, as there's only one primary storage
- // location per praefect at this time
- cc, err := c.GetConnection(primary.Storage)
+ return primary, nil
+}
+
+func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) {
+ frame, err := peeker.Peek()
if err != nil {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage)
+ return nil, err
}
- return helper.IncomingToOutgoing(ctx), cc, nil
+ m, err := mi.UnmarshalRequestProto(frame)
+ if err != nil {
+ return nil, err
+ }
+
+ return m, nil
+}
+
+func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func(), error) {
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath)
+ if err != nil {
+ return nil, err
+ }
+ return func() {
+ for _, jobID := range jobIDs {
+ if err := c.datastore.UpdateReplJob(jobID, JobStateReady); err != nil {
+ c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", JobStateReady)
+ }
+ }
+ }, nil
}
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 0275c6048..e029a3f48 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -4,7 +4,16 @@ import (
"io/ioutil"
"testing"
+ "github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
)
var testLogger = logrus.New()
@@ -16,3 +25,81 @@ func init() {
func TestSecondaryRotation(t *testing.T) {
t.Skip("secondary rotation will change with the new data model")
}
+
+func TestStreamDirector(t *testing.T) {
+ datastore := NewMemoryDatastore(config.Config{
+ Nodes: []*models.Node{
+ &models.Node{
+ Address: "tcp://gitaly-primary.example.com",
+ Storage: "praefect-internal-1",
+ }, &models.Node{
+ Address: "tcp://gitaly-backup1.example.com",
+ Storage: "praefect-internal-2",
+ }},
+ })
+
+ targetRepo := gitalypb.Repository{
+ StorageName: "praefect",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ coordinator := NewCoordinator(log.Default(), datastore)
+ require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
+
+ frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{
+ Repository: &targetRepo,
+ })
+ require.NoError(t, err)
+
+ cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure())
+ require.NoError(t, err)
+
+ coordinator.setConn("praefect-internal-1", cc)
+
+ _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame})
+ require.NoError(t, err)
+ require.Equal(t, cc, conn, "stream director should choose the primary as the client connection to use")
+
+ jobs, err := datastore.GetJobs(JobStatePending, 1, 10)
+ require.NoError(t, err)
+ require.Len(t, jobs, 1)
+
+ targetNode, err := datastore.GetStorageNode(1)
+ require.NoError(t, err)
+ sourceNode, err := datastore.GetStorageNode(0)
+ require.NoError(t, err)
+
+ expectedJob := ReplJob{
+ ID: 1,
+ TargetNode: targetNode,
+ SourceNode: sourceNode,
+ State: JobStatePending,
+ Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}},
+ }
+
+ require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")
+
+ jobUpdateFunc()
+
+ jobs, err = coordinator.datastore.GetJobs(JobStateReady, 1, 10)
+ require.NoError(t, err)
+ require.Len(t, jobs, 1)
+
+ expectedJob.State = JobStateReady
+ require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady")
+}
+
+type mockPeeker struct {
+ frame []byte
+}
+
+func (m *mockPeeker) Peek() ([]byte, error) {
+ return m.frame, nil
+}
+
+func (m *mockPeeker) Modify(payload []byte) error {
+ return nil
+}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index f9787089f..f61a64064 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -115,7 +115,6 @@ type jobRecord struct {
type MemoryDatastore struct {
jobs *struct {
sync.RWMutex
- next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
@@ -141,10 +140,8 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
},
jobs: &struct {
sync.RWMutex
- next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}{
- next: 0,
records: map[uint64]jobRecord{},
},
repositories: &struct {
@@ -160,32 +157,6 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
m.storageNodes.m[i] = *storageNode
}
- for _, repoPath := range cfg.Whitelist {
- repo := models.Repository{
- RelativePath: repoPath,
- }
- for storageID, storageNode := range cfg.Nodes {
-
- // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node
- // to be the primary
- if repo.Primary == (models.Node{}) {
- repo.Primary = *storageNode
- } else {
- repo.Replicas = append(repo.Replicas, *storageNode)
- // initialize replication job queue to replicate all whitelisted repos
- // to every replica
- m.jobs.next++
- m.jobs.records[m.jobs.next] = jobRecord{
- state: JobStateReady,
- targetNodeID: storageID,
- sourceNodeID: repo.Primary.ID,
- relativePath: repoPath,
- }
- }
- }
- m.repositories.m[repoPath] = repo
- }
-
return m
}
@@ -409,8 +380,7 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64,
for _, secondary := range repository.Replicas {
nextID := uint64(len(md.jobs.records) + 1)
- md.jobs.next++
- md.jobs.records[md.jobs.next] = jobRecord{
+ md.jobs.records[nextID] = jobRecord{
targetNodeID: secondary.ID,
state: JobStatePending,
relativePath: relativePath,
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
deleted file mode 100644
index 0881b0008..000000000
--- a/internal/praefect/datastore_memory_test.go
+++ /dev/null
@@ -1,97 +0,0 @@
-package praefect
-
-import (
- "testing"
-
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
-)
-
-// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will
-// populate itself with the correct replication jobs and repositories when initialized
-// with a configuration file specifying the shard and whitelisted repositories.
-func TestMemoryDatastoreWhitelist(t *testing.T) {
- t.Skip("Since we are getting rid of the whitelist, we can skip this test for now. We can remove it once we get rid of the whitelist")
- repo1 := models.Repository{
- RelativePath: "abcd1234",
- }
- repo2 := models.Repository{
- RelativePath: "5678efgh",
- }
- mds := NewMemoryDatastore(config.Config{
- Nodes: []*models.Node{
- &models.Node{
- ID: 0,
- Address: "tcp://default",
- Storage: "praefect-internal-1",
- },
- &models.Node{
- ID: 1,
- Address: "tcp://backup-2",
- Storage: "praefect-internal-2",
- }, &models.Node{
- ID: 2,
- Address: "tcp://backup-2",
- Storage: "praefect-internal-3",
- }},
- Whitelist: []string{repo1.RelativePath, repo2.RelativePath},
- })
-
- expectReplicas := []models.Node{
- mds.storageNodes.m[1],
- mds.storageNodes.m[2],
- }
-
- for _, repo := range []models.Repository{repo1, repo2} {
- actualReplicas, err := mds.GetReplicas(repo.RelativePath)
- require.NoError(t, err)
- require.ElementsMatch(t, expectReplicas, actualReplicas)
- }
-
- primary := mds.storageNodes.m[0]
- backup1 := mds.storageNodes.m[1]
- backup2 := mds.storageNodes.m[2]
-
- backup1ExpectedJobs := []ReplJob{
- ReplJob{
- ID: 1,
- TargetNode: backup1,
- Repository: models.Repository{RelativePath: repo1.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
- SourceNode: primary,
- State: JobStateReady,
- },
- ReplJob{
- ID: 3,
- TargetNode: backup1,
- Repository: models.Repository{RelativePath: repo2.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
- SourceNode: primary,
- State: JobStateReady,
- },
- }
- backup2ExpectedJobs := []ReplJob{
- ReplJob{
- ID: 2,
- TargetNode: backup2,
- Repository: models.Repository{RelativePath: repo1.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
- SourceNode: primary,
- State: JobStateReady,
- },
- ReplJob{
- ID: 4,
- TargetNode: backup2,
- Repository: models.Repository{RelativePath: repo2.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
- SourceNode: primary,
- State: JobStateReady,
- },
- }
-
- backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10)
- require.NoError(t, err)
- require.Equal(t, backup1ExpectedJobs, backup1ActualJobs)
-
- backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10)
- require.NoError(t, err)
- require.Equal(t, backup2ActualJobs, backup2ExpectedJobs)
-
-}
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 10a63b228..82712765f 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -21,4 +21,4 @@ import (
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
-type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, error)
+type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error)
diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go
index 2c2090363..6d4a3238e 100644
--- a/internal/praefect/grpc-proxy/proxy/examples_test.go
+++ b/internal/praefect/grpc-proxy/proxy/examples_test.go
@@ -39,10 +39,10 @@ func ExampleTransparentHandler() {
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
func ExampleStreamDirector() {
- director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
- return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
+ return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
// Copy the inbound metadata explicitly.
@@ -53,12 +53,12 @@ func ExampleStreamDirector() {
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
- return outCtx, conn, err
+ return outCtx, conn, nil, err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
- return outCtx, conn, err
+ return outCtx, conn, nil, err
}
}
- return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
+ return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
}
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index daf12d4b1..0ed1b3b5e 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -71,11 +71,17 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
peeker := newPeeker(serverStream)
// We require that the director's returned context inherits from the serverStream.Context().
- outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, peeker)
+ outgoingCtx, backendConn, requestFinalizer, err := s.director(serverStream.Context(), fullMethodName, peeker)
if err != nil {
return err
}
+ defer func() {
+ if requestFinalizer != nil {
+ requestFinalizer()
+ }
+ }()
+
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index 0fff36ed4..c57837d2a 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -207,17 +207,17 @@ func (s *ProxyHappySuite) SetupSuite() {
// Setup of the proxy's Director.
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
require.NoError(s.T(), err, "must not error on deferred client Dial")
- director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
- return ctx, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection")
+ return ctx, nil, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection")
}
}
// Explicitly copy the metadata, otherwise the tests will fail.
outCtx, _ := context.WithCancel(ctx)
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
- return outCtx, s.serverClientConn, nil
+ return outCtx, s.serverClientConn, nil, nil
}
s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go
index e274f31e2..a31e7af34 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker_test.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go
@@ -28,7 +28,7 @@ func TestStreamPeeking(t *testing.T) {
pingReqSent := &testservice.PingRequest{Value: "hi"}
// director will peek into stream before routing traffic
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
t.Logf("director routing method %s to backend", fullMethodName)
peekedMsg, err := peeker.Peek()
@@ -39,7 +39,7 @@ func TestStreamPeeking(t *testing.T) {
require.NoError(t, err)
require.Equal(t, pingReqSent, peekedRequest)
- return ctx, backendCC, nil
+ return ctx, backendCC, nil, nil
}
pingResp := &testservice.PingResponse{
@@ -87,7 +87,7 @@ func TestStreamInjecting(t *testing.T) {
newValue := "bye"
// director will peek into stream and change some frames
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
t.Logf("modifying request for method %s", fullMethodName)
peekedMsg, err := peeker.Peek()
@@ -104,7 +104,7 @@ func TestStreamInjecting(t *testing.T) {
require.NoError(t, peeker.Modify(newPayload))
- return ctx, backendCC, nil
+ return ctx, backendCC, nil, nil
}
pingResp := &testservice.PingResponse{
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index b865edfcb..f54a6f457 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -147,7 +147,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
}
for _, node := range nodes {
- jobs, err := r.datastore.GetJobs(JobStatePending|JobStateReady, node.ID, 10)
+ jobs, err := r.datastore.GetJobs(JobStateReady, node.ID, 10)
if err != nil {
return err
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index f035d925e..45c580a54 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -1,137 +1,25 @@
package praefect
import (
- "context"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"testing"
- "time"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- gitalylog "gitlab.com/gitlab-org/gitaly/internal/log"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
-// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for
-// all whitelisted repos
-func TestReplicatorProcessJobsWhitelist(t *testing.T) {
- datastore := NewMemoryDatastore(config.Config{
- Nodes: []*models.Node{
- &models.Node{
- ID: 1,
- Address: "tcp://gitaly-primary.example.com",
- Storage: "praefect-internal-1",
- }, &models.Node{
- ID: 2,
- Address: "tcp://gitaly-backup1.example.com",
- Storage: "praefect-internal-2",
- }},
- Whitelist: []string{"abcd1234", "edfg5678"},
- })
-
- logEntry := gitalylog.Default()
- coordinator := NewCoordinator(logEntry, datastore)
- resultsCh := make(chan result)
- replman := NewReplMgr(
- "default",
- logEntry,
- datastore,
- coordinator,
- WithReplicator(&mockReplicator{resultsCh}),
- )
-
- for _, node := range datastore.storageNodes.m {
- err := coordinator.RegisterNode(node.Storage, node.Address)
- require.NoError(t, err)
- }
-
- ctx, cancel := testhelper.Context()
-
- errQ := make(chan error)
-
- go func() {
- errQ <- replman.ProcessBacklog(ctx)
- }()
-
- success := make(chan struct{})
-
- var expectedResults []result
- // we expect one job per whitelisted repo with each backend server
- for _, shard := range datastore.repositories.m {
- for _, secondary := range shard.Replicas {
- cc, err := coordinator.GetConnection(secondary.Storage)
- require.NoError(t, err)
- expectedResults = append(expectedResults,
- result{relativePath: shard.RelativePath,
- targetStorage: secondary.Storage,
- targetCC: cc,
- })
- }
- }
-
- go func() {
- // we expect one job per whitelisted repo with each backend server
- for _, shard := range datastore.repositories.m {
- for range shard.Replicas {
- result := <-resultsCh
- assert.Contains(t, expectedResults, result)
- }
- }
-
- cancel()
- require.EqualError(t, <-errQ, context.Canceled.Error())
- success <- struct{}{}
- }()
-
- select {
-
- case <-success:
- return
-
- case <-time.After(time.Second):
- t.Fatalf("unable to iterate over expected jobs")
-
- }
-
-}
-
-type result struct {
- relativePath string
- targetStorage string
- targetCC *grpc.ClientConn
-}
-
-type mockReplicator struct {
- resultsCh chan<- result
-}
-
-func (mr *mockReplicator) Replicate(ctx context.Context, job ReplJob, target *grpc.ClientConn) error {
- select {
-
- case mr.resultsCh <- result{job.Repository.RelativePath, job.TargetNode.Storage, target}:
- return nil
-
- case <-ctx.Done():
- return ctx.Err()
-
- }
-
- return nil
-}
-
func TestReplicate(t *testing.T) {
srv, srvSocketPath := runFullGitalyServer(t)
defer srv.Stop()