diff options
author | John Cai <jcai@gitlab.com> | 2019-08-08 04:16:34 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-08-08 04:30:43 +0300 |
commit | 2578d09d6f72b97095f0a13f7cefadb047a491bb (patch) | |
tree | 369ddc00a7a2d7b4d7fc9acc9a03e0a6aa9e625e | |
parent | 763e8191b58adf97a3cfa0a7af59547ef70ca8c4 (diff) |
Realtime replication
-rw-r--r-- | internal/praefect/coordinator.go | 47 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/director.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/examples_test.go | 11 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 3 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_test.go | 7 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker_test.go | 12 |
6 files changed, 51 insertions, 31 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index c6c1e762a..34965275f 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -32,14 +32,14 @@ type Coordinator struct { failoverMutex sync.RWMutex connMutex sync.RWMutex - datastore ReplicasDatastore + datastore Datastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -57,7 +57,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) @@ -65,14 +65,16 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, c.failoverMutex.RLock() defer c.failoverMutex.RUnlock() + jobUpdateFunc := func() {} + frame, err := peeker.Peek() if err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } var primary *models.Node @@ -80,35 +82,35 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, if mi.Scope == protoregistry.ScopeRepository { m, err := mi.UnmarshalRequestProto(frame) if err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } targetRepo, err := mi.TargetRepo(m) if err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) if err != nil { if err != ErrPrimaryNotSet { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } // if there are no primaries for this repository, pick one nodes, err := c.datastore.GetStorageNodes() if err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } if len(nodes) == 0 { - return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + return nil, nil, jobUpdateFunc, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) } newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))] // set the primary if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } primary = &newPrimary @@ -118,10 +120,21 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, b, err := proxy.Codec().Marshal(m) if err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } if err = peeker.Modify(b); err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err + } + if mi.Operation == protoregistry.OpMutator { + jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath) + if err != nil { + return nil, nil, nil, err + } + jobUpdateFunc = func() { + for _, jobID := range jobIDs { + c.datastore.UpdateReplJob(jobID, JobStateReady) + } + } } } else { @@ -129,10 +142,10 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // proxy requests that are not repository scoped node, err := c.datastore.GetStorageNodes() if err != nil { - return nil, nil, err + return nil, nil, jobUpdateFunc, err } if len(node) == 0 { - return nil, nil, errors.New("no node storages found") + return nil, nil, jobUpdateFunc, errors.New("no node storages found") } primary = &node[0] } @@ -141,10 +154,10 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // location per praefect at this time cc, err := c.GetConnection(primary.Storage) if err != nil { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) + return nil, nil, jobUpdateFunc, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) } - return helper.IncomingToOutgoing(ctx), cc, nil + return helper.IncomingToOutgoing(ctx), cc, jobUpdateFunc, nil } // RegisterNode will direct traffic to the supplied downstream connection when the storage location diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 10a63b228..82712765f 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -21,4 +21,4 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 2c2090363..1eca8cf39 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -39,10 +39,11 @@ func ExampleTransparentHandler() { // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + f := func() {} // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, f, grpc.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) // Copy the inbound metadata explicitly. @@ -53,12 +54,12 @@ func ExampleStreamDirector() { if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + return outCtx, conn, f, err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, err + return outCtx, conn, f, err } } - return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, nil, f, grpc.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index daf12d4b1..e527a4a5c 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -71,7 +71,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error peeker := newPeeker(serverStream) // We require that the director's returned context inherits from the serverStream.Context(). - outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName, peeker) + outgoingCtx, backendConn, updateJobFunc, err := s.director(serverStream.Context(), fullMethodName, peeker) if err != nil { return err } @@ -111,6 +111,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error if c2sErr != io.EOF { return c2sErr } + updateJobFunc() return nil } } diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index 0fff36ed4..345ba9177 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -207,17 +207,18 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + f := func() {} md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return ctx, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection") + return ctx, nil, f, grpc.Errorf(codes.PermissionDenied, "testing rejection") } } // Explicitly copy the metadata, otherwise the tests will fail. outCtx, _ := context.WithCancel(ctx) outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) - return outCtx, s.serverClientConn, nil + return outCtx, s.serverClientConn, f, nil } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index e274f31e2..86463892c 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -28,9 +28,11 @@ func TestStreamPeeking(t *testing.T) { pingReqSent := &testservice.PingRequest{Value: "hi"} // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { t.Logf("director routing method %s to backend", fullMethodName) + f := func() {} + peekedMsg, err := peeker.Peek() require.NoError(t, err) @@ -39,7 +41,7 @@ func TestStreamPeeking(t *testing.T) { require.NoError(t, err) require.Equal(t, pingReqSent, peekedRequest) - return ctx, backendCC, nil + return ctx, backendCC, f, nil } pingResp := &testservice.PingResponse{ @@ -87,9 +89,11 @@ func TestStreamInjecting(t *testing.T) { newValue := "bye" // director will peek into stream and change some frames - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { t.Logf("modifying request for method %s", fullMethodName) + f := func() {} + peekedMsg, err := peeker.Peek() require.NoError(t, err) @@ -104,7 +108,7 @@ func TestStreamInjecting(t *testing.T) { require.NoError(t, peeker.Modify(newPayload)) - return ctx, backendCC, nil + return ctx, backendCC, f, nil } pingResp := &testservice.PingResponse{ |