diff options
author | John Cai <jcai@gitlab.com> | 2019-12-06 20:50:38 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-12-10 02:40:41 +0300 |
commit | f77807076cc5e34e542f8925bee4e2edacdbadfa (patch) | |
tree | 8e97aeddb282f76a524985058751b7a094025640 | |
parent | b4379bb6de838ea1188140e70cf7c5f13960eb1d (diff) |
StreamDirector returns StreamParams
The stream director has too many return values that makes it hard to
read. We want to add more values the stream director can return, so it's
cleaner to have it return something that encapsulates those other return
values.
-rw-r--r-- | changelogs/unreleased/jc-change-stream-director.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 25 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 6 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/director.go | 44 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/examples_test.go | 13 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 12 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker_test.go | 9 |
8 files changed, 78 insertions, 44 deletions
diff --git a/changelogs/unreleased/jc-change-stream-director.yml b/changelogs/unreleased/jc-change-stream-director.yml new file mode 100644 index 000000000..fd2a17c57 --- /dev/null +++ b/changelogs/unreleased/jc-change-stream-director.yml @@ -0,0 +1,5 @@ +--- +title: StreamDirector returns StreamParams +merge_request: 1679 +author: +type: other diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 1292ae7ae..4d669cbc4 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -9,19 +9,16 @@ import ( "sync" "syscall" - "gitlab.com/gitlab-org/gitaly/internal/helper" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/protoc-gen-go/descriptor" - "github.com/sirupsen/logrus" - "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" - "google.golang.org/grpc" ) func isDestructive(methodName string) bool { @@ -62,7 +59,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) @@ -72,12 +69,12 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { - return nil, nil, nil, err + return nil, err } m, err := protoMessageFromPeeker(mi, peeker) if err != nil { - return nil, nil, nil, err + return nil, err } var requestFinalizer func() @@ -86,22 +83,22 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, if mi.Scope == protoregistry.ScopeRepository { storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) if err != nil { - return nil, nil, nil, err + return nil, err } } else { storage, requestFinalizer, err = c.getAnyStorageNode() if err != nil { - return nil, nil, nil, err + return nil, err } } // We only need the primary node, as there's only one primary storage // location per praefect at this time cc, err := c.connections.GetConnection(storage) if err != nil { - return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage) + return nil, fmt.Errorf("unable to find existing client connection for %s", storage) } - return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil + return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil } var noopRequestFinalizer = func() {} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index bd941f97a..22013e830 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -73,9 +73,9 @@ func TestStreamDirector(t *testing.T) { fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool" peeker := &mockPeeker{frame} - _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, fullMethod, peeker) + streamParams, err := coordinator.streamDirector(ctx, fullMethod, peeker) require.NoError(t, err) - require.Equal(t, address, conn.Target()) + require.Equal(t, address, streamParams.Conn().Target()) mi, err := coordinator.registry.LookupMethod(fullMethod) require.NoError(t, err) @@ -112,7 +112,7 @@ func TestStreamDirector(t *testing.T) { require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct") - jobUpdateFunc() + streamParams.RequestFinalizer() jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, "praefect-internal-2", 10) require.NoError(t, err) diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 82712765f..0efda2b91 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -4,6 +4,7 @@ package proxy import ( + "gitlab.com/gitlab-org/gitaly/internal/helper" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -21,4 +22,45 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) +type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (*StreamParameters, error) + +// StreamParameters encapsulates streaming parameters the praefect coordinator returns to the +// proxy handler +type StreamParameters struct { + ctx context.Context + conn *grpc.ClientConn + reqFinalizer func() + callOptions []grpc.CallOption +} + +// NewStreamParameters returns a new instance of StreamParameters +func NewStreamParameters(ctx context.Context, conn *grpc.ClientConn, reqFinalizer func(), callOpts []grpc.CallOption) *StreamParameters { + return &StreamParameters{ + ctx: helper.IncomingToOutgoing(ctx), + conn: conn, + reqFinalizer: reqFinalizer, + callOptions: callOpts, + } +} + +// Context returns the outgoing context +func (s *StreamParameters) Context() context.Context { + return s.ctx +} + +// Conn returns a grpc client connection +func (s *StreamParameters) Conn() *grpc.ClientConn { + return s.conn +} + +// RequestFinalizer calls the request finalizer +func (s *StreamParameters) RequestFinalizer() { + if s.reqFinalizer != nil { + s.reqFinalizer() + } +} + +// CallOptions returns call options +func (s *StreamParameters) CallOptions() []grpc.CallOption { + return s.callOptions +} diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 6d4a3238e..ed0c57a20 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -39,26 +39,23 @@ func ExampleTransparentHandler() { // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { - return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) - // Copy the inbound metadata explicitly. - outCtx, _ := context.WithCancel(ctx) - outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) if ok { // Decide on which backend to dial if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, nil, err + return proxy.NewStreamParameters(ctx, conn, nil, nil), err } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())) - return outCtx, conn, nil, err + return proxy.NewStreamParameters(ctx, conn, nil, nil), err } } - return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") + return nil, grpc.Errorf(codes.Unimplemented, "Unknown method") } } diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index d30a13e58..0b8c901f7 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -72,20 +72,16 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error peeker := newPeeker(serverStream) // We require that the director's returned context inherits from the serverStream.Context(). - outgoingCtx, backendConn, requestFinalizer, err := s.director(serverStream.Context(), fullMethodName, peeker) + params, err := s.director(serverStream.Context(), fullMethodName, peeker) if err != nil { return err } - defer func() { - if requestFinalizer != nil { - requestFinalizer() - } - }() + defer params.RequestFinalizer() - clientCtx, clientCancel := context.WithCancel(outgoingCtx) + clientCtx, clientCancel := context.WithCancel(params.Context()) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. - clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Conn(), fullMethodName, params.CallOptions()...) if err != nil { return err } diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index c57837d2a..fe0133a34 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -207,17 +207,15 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { - return ctx, nil, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection") + return proxy.NewStreamParameters(ctx, nil, nil, nil), grpc.Errorf(codes.PermissionDenied, "testing rejection") } } // Explicitly copy the metadata, otherwise the tests will fail. - outCtx, _ := context.WithCancel(ctx) - outCtx = metadata.NewOutgoingContext(outCtx, md.Copy()) - return outCtx, s.serverClientConn, nil, nil + return proxy.NewStreamParameters(ctx, s.serverClientConn, nil, nil), nil } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index a31e7af34..e6de9c884 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -8,7 +8,6 @@ import ( "github.com/golang/protobuf/proto" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "golang.org/x/net/context" - "google.golang.org/grpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,7 +27,7 @@ func TestStreamPeeking(t *testing.T) { pingReqSent := &testservice.PingRequest{Value: "hi"} // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { t.Logf("director routing method %s to backend", fullMethodName) peekedMsg, err := peeker.Peek() @@ -39,7 +38,7 @@ func TestStreamPeeking(t *testing.T) { require.NoError(t, err) require.Equal(t, pingReqSent, peekedRequest) - return ctx, backendCC, nil, nil + return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil } pingResp := &testservice.PingResponse{ @@ -87,7 +86,7 @@ func TestStreamInjecting(t *testing.T) { newValue := "bye" // director will peek into stream and change some frames - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { t.Logf("modifying request for method %s", fullMethodName) peekedMsg, err := peeker.Peek() @@ -104,7 +103,7 @@ func TestStreamInjecting(t *testing.T) { require.NoError(t, peeker.Modify(newPayload)) - return ctx, backendCC, nil, nil + return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil } pingResp := &testservice.PingResponse{ |