Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-12-06 20:50:38 +0300
committerJohn Cai <jcai@gitlab.com>2019-12-10 02:40:41 +0300
commitf77807076cc5e34e542f8925bee4e2edacdbadfa (patch)
tree8e97aeddb282f76a524985058751b7a094025640
parentb4379bb6de838ea1188140e70cf7c5f13960eb1d (diff)
StreamDirector returns StreamParams
The stream director has too many return values that makes it hard to read. We want to add more values the stream director can return, so it's cleaner to have it return something that encapsulates those other return values.
-rw-r--r--changelogs/unreleased/jc-change-stream-director.yml5
-rw-r--r--internal/praefect/coordinator.go25
-rw-r--r--internal/praefect/coordinator_test.go6
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go44
-rw-r--r--internal/praefect/grpc-proxy/proxy/examples_test.go13
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go12
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go8
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker_test.go9
8 files changed, 78 insertions, 44 deletions
diff --git a/changelogs/unreleased/jc-change-stream-director.yml b/changelogs/unreleased/jc-change-stream-director.yml
new file mode 100644
index 000000000..fd2a17c57
--- /dev/null
+++ b/changelogs/unreleased/jc-change-stream-director.yml
@@ -0,0 +1,5 @@
+---
+title: StreamDirector returns StreamParams
+merge_request: 1679
+author:
+type: other
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 1292ae7ae..4d669cbc4 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -9,19 +9,16 @@ import (
"sync"
"syscall"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/protoc-gen-go/descriptor"
+ "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
-
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/protoc-gen-go/descriptor"
- "github.com/sirupsen/logrus"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- "google.golang.org/grpc"
)
func isDestructive(methodName string) bool {
@@ -62,7 +59,7 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
}
// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
@@ -72,12 +69,12 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
- return nil, nil, nil, err
+ return nil, err
}
m, err := protoMessageFromPeeker(mi, peeker)
if err != nil {
- return nil, nil, nil, err
+ return nil, err
}
var requestFinalizer func()
@@ -86,22 +83,22 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
if mi.Scope == protoregistry.ScopeRepository {
storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
if err != nil {
- return nil, nil, nil, err
+ return nil, err
}
} else {
storage, requestFinalizer, err = c.getAnyStorageNode()
if err != nil {
- return nil, nil, nil, err
+ return nil, err
}
}
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
cc, err := c.connections.GetConnection(storage)
if err != nil {
- return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage)
+ return nil, fmt.Errorf("unable to find existing client connection for %s", storage)
}
- return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil
+ return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil
}
var noopRequestFinalizer = func() {}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index bd941f97a..22013e830 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -73,9 +73,9 @@ func TestStreamDirector(t *testing.T) {
fullMethod := "/gitaly.ObjectPoolService/FetchIntoObjectPool"
peeker := &mockPeeker{frame}
- _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, fullMethod, peeker)
+ streamParams, err := coordinator.streamDirector(ctx, fullMethod, peeker)
require.NoError(t, err)
- require.Equal(t, address, conn.Target())
+ require.Equal(t, address, streamParams.Conn().Target())
mi, err := coordinator.registry.LookupMethod(fullMethod)
require.NoError(t, err)
@@ -112,7 +112,7 @@ func TestStreamDirector(t *testing.T) {
require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")
- jobUpdateFunc()
+ streamParams.RequestFinalizer()
jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, "praefect-internal-2", 10)
require.NoError(t, err)
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 82712765f..0efda2b91 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -4,6 +4,7 @@
package proxy
import (
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@@ -21,4 +22,45 @@ import (
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
-type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error)
+type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (*StreamParameters, error)
+
+// StreamParameters encapsulates streaming parameters the praefect coordinator returns to the
+// proxy handler
+type StreamParameters struct {
+ ctx context.Context
+ conn *grpc.ClientConn
+ reqFinalizer func()
+ callOptions []grpc.CallOption
+}
+
+// NewStreamParameters returns a new instance of StreamParameters
+func NewStreamParameters(ctx context.Context, conn *grpc.ClientConn, reqFinalizer func(), callOpts []grpc.CallOption) *StreamParameters {
+ return &StreamParameters{
+ ctx: helper.IncomingToOutgoing(ctx),
+ conn: conn,
+ reqFinalizer: reqFinalizer,
+ callOptions: callOpts,
+ }
+}
+
+// Context returns the outgoing context
+func (s *StreamParameters) Context() context.Context {
+ return s.ctx
+}
+
+// Conn returns a grpc client connection
+func (s *StreamParameters) Conn() *grpc.ClientConn {
+ return s.conn
+}
+
+// RequestFinalizer calls the request finalizer
+func (s *StreamParameters) RequestFinalizer() {
+ if s.reqFinalizer != nil {
+ s.reqFinalizer()
+ }
+}
+
+// CallOptions returns call options
+func (s *StreamParameters) CallOptions() []grpc.CallOption {
+ return s.callOptions
+}
diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go
index 6d4a3238e..ed0c57a20 100644
--- a/internal/praefect/grpc-proxy/proxy/examples_test.go
+++ b/internal/praefect/grpc-proxy/proxy/examples_test.go
@@ -39,26 +39,23 @@ func ExampleTransparentHandler() {
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
func ExampleStreamDirector() {
- director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
- return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
+ return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
- // Copy the inbound metadata explicitly.
- outCtx, _ := context.WithCancel(ctx)
- outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
if ok {
// Decide on which backend to dial
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec()))
- return outCtx, conn, nil, err
+ return proxy.NewStreamParameters(ctx, conn, nil, nil), err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec()))
- return outCtx, conn, nil, err
+ return proxy.NewStreamParameters(ctx, conn, nil, nil), err
}
}
- return nil, nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
+ return nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
}
}
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index d30a13e58..0b8c901f7 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -72,20 +72,16 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
peeker := newPeeker(serverStream)
// We require that the director's returned context inherits from the serverStream.Context().
- outgoingCtx, backendConn, requestFinalizer, err := s.director(serverStream.Context(), fullMethodName, peeker)
+ params, err := s.director(serverStream.Context(), fullMethodName, peeker)
if err != nil {
return err
}
- defer func() {
- if requestFinalizer != nil {
- requestFinalizer()
- }
- }()
+ defer params.RequestFinalizer()
- clientCtx, clientCancel := context.WithCancel(outgoingCtx)
+ clientCtx, clientCancel := context.WithCancel(params.Context())
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
- clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
+ clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Conn(), fullMethodName, params.CallOptions()...)
if err != nil {
return err
}
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index c57837d2a..fe0133a34 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -207,17 +207,15 @@ func (s *ProxyHappySuite) SetupSuite() {
// Setup of the proxy's Director.
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
require.NoError(s.T(), err, "must not error on deferred client Dial")
- director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
- return ctx, nil, nil, grpc.Errorf(codes.PermissionDenied, "testing rejection")
+ return proxy.NewStreamParameters(ctx, nil, nil, nil), grpc.Errorf(codes.PermissionDenied, "testing rejection")
}
}
// Explicitly copy the metadata, otherwise the tests will fail.
- outCtx, _ := context.WithCancel(ctx)
- outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())
- return outCtx, s.serverClientConn, nil, nil
+ return proxy.NewStreamParameters(ctx, s.serverClientConn, nil, nil), nil
}
s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go
index a31e7af34..e6de9c884 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker_test.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go
@@ -8,7 +8,6 @@ import (
"github.com/golang/protobuf/proto"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"golang.org/x/net/context"
- "google.golang.org/grpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -28,7 +27,7 @@ func TestStreamPeeking(t *testing.T) {
pingReqSent := &testservice.PingRequest{Value: "hi"}
// director will peek into stream before routing traffic
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
t.Logf("director routing method %s to backend", fullMethodName)
peekedMsg, err := peeker.Peek()
@@ -39,7 +38,7 @@ func TestStreamPeeking(t *testing.T) {
require.NoError(t, err)
require.Equal(t, pingReqSent, peekedRequest)
- return ctx, backendCC, nil, nil
+ return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil
}
pingResp := &testservice.PingResponse{
@@ -87,7 +86,7 @@ func TestStreamInjecting(t *testing.T) {
newValue := "bye"
// director will peek into stream and change some frames
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
t.Logf("modifying request for method %s", fullMethodName)
peekedMsg, err := peeker.Peek()
@@ -104,7 +103,7 @@ func TestStreamInjecting(t *testing.T) {
require.NoError(t, peeker.Modify(newPayload))
- return ctx, backendCC, nil, nil
+ return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil
}
pingResp := &testservice.PingResponse{