Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-08-08 04:16:34 +0300
committerJohn Cai <jcai@gitlab.com>2019-08-08 04:30:43 +0300
commit2578d09d6f72b97095f0a13f7cefadb047a491bb (patch)
tree369ddc00a7a2d7b4d7fc9acc9a03e0a6aa9e625e
parent763e8191b58adf97a3cfa0a7af59547ef70ca8c4 (diff)
Realtime replication
-rw-r--r--internal/praefect/coordinator.go47
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/examples_test.go11
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go3
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go7
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker_test.go12
6 files changed, 51 insertions, 31 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index c6c1e762a..34965275f 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -32,14 +32,14 @@ type Coordinator struct {
failoverMutex sync.RWMutex
connMutex sync.RWMutex
- datastore ReplicasDatastore
+ datastore Datastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -57,7 +57,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
}
// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
@@ -65,14 +65,16 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
c.failoverMutex.RLock()
defer c.failoverMutex.RUnlock()
+ jobUpdateFunc := func() {}
+
frame, err := peeker.Peek()
if err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
var primary *models.Node
@@ -80,35 +82,35 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
if mi.Scope == protoregistry.ScopeRepository {
m, err := mi.UnmarshalRequestProto(frame)
if err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
targetRepo, err := mi.TargetRepo(m)
if err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
if err != nil {
if err != ErrPrimaryNotSet {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
// if there are no primaries for this repository, pick one
nodes, err := c.datastore.GetStorageNodes()
if err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
if len(nodes) == 0 {
- return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
+ return nil, nil, jobUpdateFunc, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
}
newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))]
// set the primary
if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
primary = &newPrimary
@@ -118,10 +120,21 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
b, err := proxy.Codec().Marshal(m)
if err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
if err = peeker.Modify(b); err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
+ }
+ if mi.Operation == protoregistry.OpMutator {
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath)
+ if err != nil {
+ return nil, nil, nil, err
+ }
+ jobUpdateFunc = func() {
+ for _, jobID := range jobIDs {
+ c.datastore.UpdateReplJob(jobID, JobStateReady)
+ }
+ }
}
} else {
@@ -129,10 +142,10 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// proxy requests that are not repository scoped
node, err := c.datastore.GetStorageNodes()
if err != nil {
- return nil, nil, err
+ return nil, nil, jobUpdateFunc, err
}
if len(node) == 0 {
- return nil, nil, errors.New("no node storages found")
+ return nil, nil, jobUpdateFunc, errors.New("no node storages found")
}
primary = &node[0]
}
@@ -141,10 +154,10 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// location per praefect at this time
cc, err := c.GetConnection(primary.Storage)
if err != nil {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage)
+ return nil, nil, jobUpdateFunc, fmt.Errorf("unable to find existing client connection for %s", primary.Storage)
}
- return helper.IncomingToOutgoing(ctx), cc, nil
+ return helper.IncomingToOutgoing(ctx), cc, jobUpdateFunc, nil
}
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 10a63b228..82712765f 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -21,4 +21,4 @@ import (
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
-type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, error)
+type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error)
diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go
index 2c2090363..1eca8cf39 100644
--- a/internal/praefect/grpc-proxy/proxy/examples_test.go
+++ b/internal/praefect/grpc-proxy/proxy/examples_test.go
@@ -39,10 +39,11 @@ func ExampleTransparentHandler() {
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
func ExampleStreamDirector() {
- director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ f := func() {}
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
- return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
+ return nil, nil, f, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
// Copy the inbound metadata explicitly.
@@ -53,12 +54,12 @@ func ExampleStreamDirector() {
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
- return outCtx, conn, err
+ return outCtx, conn, f, err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
- return outCtx, conn, err
+ return outCtx, conn, f, err
}
}
- return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
+ return nil, nil, f, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
}
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index daf12d4b1..e527a4a5c 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -71,7 +71,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
peeker := newPeeker(serverStream)
// We require that the director's returned context inherits from the serverStream.Context().
- outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, peeker)
+ outgoingCtx, backendConn, updateJobFunc, err := s.director(serverStream.Context(), fullMethodName, peeker)
if err != nil {
return err
}
@@ -111,6 +111,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
if c2sErr != io.EOF {
return c2sErr
}
+ updateJobFunc()
return nil
}
}
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index 0fff36ed4..345ba9177 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -207,17 +207,18 @@ func (s *ProxyHappySuite) SetupSuite() {
// Setup of the proxy's Director.
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
require.NoError(s.T(), err, "must not error on deferred client Dial")
- director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ f := func() {}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
- return ctx, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection")
+ return ctx, nil, f, grpc.Errorf(codes.PermissionDenied, "testing rejection")
}
}
// Explicitly copy the metadata, otherwise the tests will fail.
outCtx, _ := context.WithCancel(ctx)
outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
- return outCtx, s.serverClientConn, nil
+ return outCtx, s.serverClientConn, f, nil
}
s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go
index e274f31e2..86463892c 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker_test.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go
@@ -28,9 +28,11 @@ func TestStreamPeeking(t *testing.T) {
pingReqSent := &testservice.PingRequest{Value: "hi"}
// director will peek into stream before routing traffic
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
t.Logf("director routing method %s to backend", fullMethodName)
+ f := func() {}
+
peekedMsg, err := peeker.Peek()
require.NoError(t, err)
@@ -39,7 +41,7 @@ func TestStreamPeeking(t *testing.T) {
require.NoError(t, err)
require.Equal(t, pingReqSent, peekedRequest)
- return ctx, backendCC, nil
+ return ctx, backendCC, f, nil
}
pingResp := &testservice.PingResponse{
@@ -87,9 +89,11 @@ func TestStreamInjecting(t *testing.T) {
newValue := "bye"
// director will peek into stream and change some frames
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
t.Logf("modifying request for method %s", fullMethodName)
+ f := func() {}
+
peekedMsg, err := peeker.Peek()
require.NoError(t, err)
@@ -104,7 +108,7 @@ func TestStreamInjecting(t *testing.T) {
require.NoError(t, peeker.Modify(newPayload))
- return ctx, backendCC, nil
+ return ctx, backendCC, f, nil
}
pingResp := &testservice.PingResponse{