Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-06-16 11:44:09 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2020-06-16 11:44:09 +0300
commita33b2576c17ac2753bae817e6b0f9b8f734f6b6f (patch)
treebed5ad8cc29404294f48d54ca8947cd681e2e824 /internal/praefect/grpc-proxy
parentd958b52386fcb1cbc5f83a5291038370fb384a44 (diff)
Multi node write
Diffstat (limited to 'internal/praefect/grpc-proxy')
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go30
-rw-r--r--internal/praefect/grpc-proxy/proxy/examples_test.go7
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go185
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go15
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker.go28
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker_test.go11
6 files changed, 182 insertions, 94 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 50a0ee63f..9c613dd11 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -6,7 +6,6 @@ package proxy
import (
"context"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
"google.golang.org/grpc"
)
@@ -23,35 +22,40 @@ import (
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
-type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (*StreamParameters, error)
+type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamPeeker) (*StreamParameters, error)
// StreamParameters encapsulates streaming parameters the praefect coordinator returns to the
// proxy handler
type StreamParameters struct {
- ctx context.Context
- conn *grpc.ClientConn
+ primary Destination
reqFinalizer func()
callOptions []grpc.CallOption
+ secondaries []Destination
+}
+
+// Destination contains a client connection as well as a rewritten protobuf message
+type Destination struct {
+ Ctx context.Context
+ Conn *grpc.ClientConn
+ Msg []byte
}
// NewStreamParameters returns a new instance of StreamParameters
-func NewStreamParameters(ctx context.Context, conn *grpc.ClientConn, reqFinalizer func(), callOpts []grpc.CallOption) *StreamParameters {
+func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func(), callOpts []grpc.CallOption) *StreamParameters {
return &StreamParameters{
- ctx: helper.IncomingToOutgoing(ctx),
- conn: conn,
+ primary: primary,
+ secondaries: secondaries,
reqFinalizer: reqFinalizer,
callOptions: callOpts,
}
}
-// Context returns the outgoing context
-func (s *StreamParameters) Context() context.Context {
- return s.ctx
+func (s *StreamParameters) Primary() Destination {
+ return s.primary
}
-// Conn returns a grpc client connection
-func (s *StreamParameters) Conn() *grpc.ClientConn {
- return s.conn
+func (s *StreamParameters) Secondaries() []Destination {
+ return s.secondaries
}
// RequestFinalizer calls the request finalizer
diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go
index 0f4050884..eb5508de5 100644
--- a/internal/praefect/grpc-proxy/proxy/examples_test.go
+++ b/internal/praefect/grpc-proxy/proxy/examples_test.go
@@ -11,6 +11,7 @@ import (
"context"
"strings"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -40,7 +41,7 @@ func ExampleTransparentHandler() {
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
func ExampleStreamDirector() {
- director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) {
+ director = func(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (*proxy.StreamParameters, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
@@ -51,10 +52,10 @@ func ExampleStreamDirector() {
if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
- return proxy.NewStreamParameters(ctx, conn, nil, nil), err
+ return proxy.NewStreamParameters(proxy.Destination{Conn: conn, Ctx: helper.IncomingToOutgoing(ctx)}, nil, nil, nil), err
} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
- return proxy.NewStreamParameters(ctx, conn, nil, nil), err
+ return proxy.NewStreamParameters(proxy.Destination{Conn: conn, Ctx: helper.IncomingToOutgoing(ctx)}, nil, nil, nil), err
}
}
return nil, status.Errorf(codes.Unimplemented, "Unknown method")
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index 5ea376e07..f226488c7 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -9,9 +9,11 @@ package proxy
import (
"context"
+ "errors"
"io"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
+ "golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -88,6 +90,12 @@ type handler struct {
director StreamDirector
}
+type streamAndMsg struct {
+ grpc.ClientStream
+ msg []byte
+ cancel func()
+}
+
// handler is where the real magic of proxying happens.
// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
// forwarding it to a ClientStream established against the relevant ClientConn.
@@ -108,38 +116,66 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
defer params.RequestFinalizer()
- clientCtx, clientCancel := context.WithCancel(params.Context())
+ clientCtx, clientCancel := context.WithCancel(params.Primary().Ctx)
defer clientCancel()
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
- clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Conn(), fullMethodName, params.CallOptions()...)
+
+ primaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Primary().Conn, fullMethodName, params.CallOptions()...)
if err != nil {
return err
}
+
+ primaryStream := streamAndMsg{
+ ClientStream: primaryClientStream,
+ msg: params.Primary().Msg,
+ cancel: clientCancel,
+ }
+
+ var secondaryStreams []streamAndMsg
+ for _, conn := range params.Secondaries() {
+ clientCtx, clientCancel := context.WithCancel(conn.Ctx)
+ defer clientCancel()
+
+ secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, conn.Conn, fullMethodName, params.CallOptions()...)
+ if err != nil {
+ return err
+ }
+ secondaryStreams = append(secondaryStreams, streamAndMsg{
+ ClientStream: secondaryClientStream,
+ msg: conn.Msg,
+ cancel: clientCancel,
+ })
+ }
+
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
- s2cErrChan := s.forwardServerToClient(serverStream, clientStream, peeker.consumedStream)
- c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
- // We don't know which side is going to stop sending first, so we need a select between the two.
- for i := 0; i < 2; i++ {
+ s2cErrChan := s.forwardServerToClients(serverStream, append(secondaryStreams, primaryStream))
+ c2sErrChan := s.forwardClientToServer(primaryClientStream, serverStream)
+ secondaryErrChan := receiveSecondaryStreams(secondaryStreams)
+
+ // We don't know whether the server, primary, or secondaries will stop sending first, so we need a select between them
+ for {
select {
- case s2cErr := <-s2cErrChan:
- if s2cErr == io.EOF {
- // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
- // the clientStream>serverStream may continue pumping though.
- clientStream.CloseSend()
- } else {
- // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
+ case s2cErr, ok := <-s2cErrChan:
+ if !ok {
+ continue
+ }
+ if s2cErr != nil {
+ // we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
- clientCancel()
+
+ for _, stream := range append(secondaryStreams, primaryStream) {
+ stream.cancel()
+ }
return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
// will be nil.
- trailer := clientStream.Trailer()
+ trailer := primaryClientStream.Trailer()
serverStream.SetTrailer(trailer)
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
if c2sErr != io.EOF {
@@ -149,10 +185,48 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
}
return c2sErr
}
+
+ secondaryErr, ok := <-secondaryErrChan
+ if !ok {
+ return status.Error(codes.Internal, "failed proxying to secondary")
+ }
+ if secondaryErr != nil {
+ return status.Errorf(codes.Internal, "failed proxying to secondary: %v", secondaryErr)
+ }
+
return nil
}
}
- return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+}
+
+// receiveSecondaryStreams reads from the client streams of the secondaries and drops the message
+// but returns an error to the channel if it encounters a non io.EOF error
+func receiveSecondaryStreams(srcs []streamAndMsg) chan error {
+ ret := make(chan error, 1)
+
+ go func() {
+ var g errgroup.Group
+ defer close(ret)
+
+ for _, src := range srcs {
+ src := src // rescoping for goroutine
+ g.Go(func() error {
+ for {
+ if err := src.RecvMsg(&frame{}); err != nil {
+ if errors.Is(err, io.EOF) {
+ return nil
+ }
+
+ src.cancel()
+ return err
+ }
+ }
+ })
+ }
+
+ ret <- g.Wait()
+ }()
+ return ret
}
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
@@ -187,41 +261,68 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt
return ret
}
-func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream, consumedStream *partialStream) chan error {
- ret := make(chan error, 1)
- go func() {
- // send any consumed/peeked frames first
- for _, frame := range consumedStream.frames {
- if frame == nil {
- // It is possible for peeked frames to be empty. This most likely
- // occurs when the server stream returns an error before the desired
- // number of frames can be peeked
+func forwardConsumedToClient(dst grpc.ClientStream, frameChan <-chan *frame) error {
+ for f := range frameChan {
+ if err := dst.SendMsg(f); err != nil {
+ if errors.Is(err, io.EOF) {
break
}
- if err := dst.SendMsg(frame); err != nil {
- ret <- err
- return
- }
+ return err
}
+ }
+
+ // all messages redirected
+ return dst.CloseSend()
+}
+
+func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) chan error {
+ ret := make(chan error, 1)
+ go func() {
+ var g errgroup.Group
+ defer close(ret)
- // we may have encountered an error earlier while peeking
- if consumedStream.err != nil {
- ret <- consumedStream.err
- return
+ frameChans := make([]chan<- *frame, 0, len(dsts))
+
+ for _, dst := range dsts {
+ dst := dst
+ frameChan := make(chan *frame, 16)
+ frameChan <- &frame{payload: dst.msg} // send re-written message
+ frameChans = append(frameChans, frameChan)
+
+ g.Go(func() error { return forwardConsumedToClient(dst, frameChan) })
}
- // resume two-way stream after peeked messages
- f := &frame{}
- for i := 0; ; i++ {
- if err := src.RecvMsg(f); err != nil {
- ret <- err // this can be io.EOF which is happy case
- break
- }
- if err := dst.SendMsg(f); err != nil {
+ for {
+ if err := consumeServerAndForward(src, frameChans); err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+
ret <- err
- break
+ return
}
}
+
+ ret <- g.Wait()
}()
return ret
}
+
+func consumeServerAndForward(src grpc.ServerStream, frameChans []chan<- *frame) error {
+ f := &frame{}
+
+ if err := src.RecvMsg(f); err != nil {
+ for _, frameChan := range frameChans {
+ // signal no more data to redirect
+ close(frameChan)
+ }
+
+ return err // this can be io.EOF which is happy case
+ }
+
+ for _, frameChan := range frameChans {
+ frameChan <- f
+ }
+
+ return nil
+}
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index 267ed1e85..f9ef180a1 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
@@ -252,15 +253,21 @@ func (s *ProxyHappySuite) SetupSuite() {
// Setup of the proxy's Director.
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
require.NoError(s.T(), err, "must not error on deferred client Dial")
- director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (*proxy.StreamParameters, error) {
+ director := func(ctx context.Context, fullName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ payload, err := peeker.Peek()
+ if err != nil {
+ return nil, err
+ }
+
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
- return proxy.NewStreamParameters(ctx, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection")
+ return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Msg: payload}, nil, nil, nil), status.Errorf(codes.PermissionDenied, "testing rejection")
}
}
+
// Explicitly copy the metadata, otherwise the tests will fail.
- return proxy.NewStreamParameters(ctx, s.serverClientConn, nil, nil), nil
+ return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: s.serverClientConn, Msg: payload}, nil, nil, nil), nil
}
s.proxy = grpc.NewServer(
@@ -326,7 +333,7 @@ func TestRegisterStreamHandlers(t *testing.T) {
server := grpc.NewServer(
grpc.CustomCodec(proxy.NewCodec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
return nil, directorCalledError
})),
)
diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go
index 1d1e02df5..459468dbe 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker.go
@@ -2,28 +2,21 @@ package proxy
import (
"errors"
- "fmt"
"google.golang.org/grpc"
)
// StreamModifier abstracts away the gRPC stream being forwarded so that it can
// be inspected and modified.
-type StreamModifier interface {
+type StreamPeeker interface {
// Peek allows a director to peek one message into the request stream without
// removing those messages from the stream that will be forwarded to
// the backend server.
Peek() (frame []byte, _ error)
-
- // Modify replaces the peeked payload in the stream with the provided frame.
- // If no payload was peeked, an error will be returned.
- // Note: Modify cannot be called after the director returns.
- Modify(payload []byte) error
}
type partialStream struct {
frames []*frame // frames encountered in partial stream
- err error // error returned by partial stream
}
type peeker struct {
@@ -55,10 +48,6 @@ func (p peeker) Peek() ([]byte, error) {
return payloads[0], nil
}
-func (p peeker) Modify(payload []byte) error {
- return p.modify([][]byte{payload})
-}
-
func (p peeker) peek(n uint) ([][]byte, error) {
if n < 1 {
return nil, ErrInvalidPeekCount
@@ -70,8 +59,7 @@ func (p peeker) peek(n uint) ([][]byte, error) {
for i := 0; i < len(p.consumedStream.frames); i++ {
f := &frame{}
if err := p.srcStream.RecvMsg(f); err != nil {
- p.consumedStream.err = err
- break
+ return nil, err
}
p.consumedStream.frames[i] = f
peekedFrames[i] = f.payload
@@ -79,15 +67,3 @@ func (p peeker) peek(n uint) ([][]byte, error) {
return peekedFrames, nil
}
-
-func (p peeker) modify(payloads [][]byte) error {
- if len(payloads) != len(p.consumedStream.frames) {
- return fmt.Errorf("replacement frames count %d does not match consumed frames count %d", len(payloads), len(p.consumedStream.frames))
- }
-
- for i, payload := range payloads {
- p.consumedStream.frames[i].payload = payload
- }
-
- return nil
-}
diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go
index b5d882ff4..823cb50f6 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker_test.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go
@@ -9,6 +9,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
testservice "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata"
)
@@ -26,7 +27,7 @@ func TestStreamPeeking(t *testing.T) {
pingReqSent := &testservice.PingRequest{Value: "hi"}
// director will peek into stream before routing traffic
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
t.Logf("director routing method %s to backend", fullMethodName)
peekedMsg, err := peeker.Peek()
@@ -37,7 +38,7 @@ func TestStreamPeeking(t *testing.T) {
require.NoError(t, err)
require.True(t, proto.Equal(pingReqSent, peekedRequest), "expected to be the same")
- return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil
+ return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: backendCC, Msg: peekedMsg}, nil, nil, nil), nil
}
pingResp := &testservice.PingResponse{
@@ -85,7 +86,7 @@ func TestStreamInjecting(t *testing.T) {
newValue := "bye"
// director will peek into stream and change some frames
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
t.Logf("modifying request for method %s", fullMethodName)
peekedMsg, err := peeker.Peek()
@@ -100,9 +101,7 @@ func TestStreamInjecting(t *testing.T) {
newPayload, err := proto.Marshal(peekedRequest)
require.NoError(t, err)
- require.NoError(t, peeker.Modify(newPayload))
-
- return proxy.NewStreamParameters(ctx, backendCC, nil, nil), nil
+ return proxy.NewStreamParameters(proxy.Destination{Ctx: helper.IncomingToOutgoing(ctx), Conn: backendCC, Msg: newPayload}, nil, nil, nil), nil
}
pingResp := &testservice.PingResponse{