diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-07-31 19:05:33 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-07-31 19:05:33 +0300 |
commit | 1ab045ca8712df495fd7bafb544893ed04ea44a7 (patch) | |
tree | 852416f42b3baab419f996f1013f83003ba5425b | |
parent | 71fa6ae18aaab3dacf016bc24b295e512356bf01 (diff) | |
parent | b405915a7cbd496c8e60c881463354f833a520be (diff) |
Merge branch 'jc-replace-frames' into 'master'
Add capability to replace frames in a stream
Closes #1804
See merge request gitlab-org/gitaly!1382
-rw-r--r-- | changelogs/unreleased/jc-replace-frames.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/director.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/examples_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker.go | 46 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker_test.go | 74 |
7 files changed, 118 insertions, 15 deletions
diff --git a/changelogs/unreleased/jc-replace-frames.yml b/changelogs/unreleased/jc-replace-frames.yml new file mode 100644 index 000000000..e8261d283 --- /dev/null +++ b/changelogs/unreleased/jc-replace-frames.yml @@ -0,0 +1,5 @@ +--- +title: Add capability to replace certain frames in a stream +merge_request: 1382 +author: +type: added diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 8f64022cb..c238604f3 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -69,7 +69,7 @@ func (c *Coordinator) GetStorageNode(storage string) (Node, error) { } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 37f2be2d1..10a63b228 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -21,4 +21,4 @@ import ( // are invoked. So decisions around authorization, monitoring etc. are better to be handled there. // // See the rather rich example. -type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamPeeker) (context.Context, *grpc.ClientConn, error) +type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, error) diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index e312f3a5c..2c2090363 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -39,7 +39,7 @@ func ExampleTransparentHandler() { // Provide sa simple example of a director that shields internal services and dials a staging or production backend. // This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. func ExampleStreamDirector() { - director = func(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { + director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method") diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index 0a4fabd20..0fff36ed4 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -207,7 +207,7 @@ func (s *ProxyHappySuite) SetupSuite() { // Setup of the proxy's Director. s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) require.NoError(s.T(), err, "must not error on deferred client Dial") - director := func(ctx context.Context, fullName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { md, ok := metadata.FromIncomingContext(ctx) if ok { if _, exists := md[rejectingMdKey]; exists { diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go index 69b41f672..1d1e02df5 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker.go +++ b/internal/praefect/grpc-proxy/proxy/peeker.go @@ -2,18 +2,23 @@ package proxy import ( "errors" + "fmt" - "golang.org/x/net/context" "google.golang.org/grpc" ) -// StreamPeeker abstracts away the gRPC stream being forwarded so that it can +// StreamModifier abstracts away the gRPC stream being forwarded so that it can // be inspected and modified. -type StreamPeeker interface { - // Peek allows a director to peak n-messages into the stream without +type StreamModifier interface { + // Peek allows a director to peek one message into the request stream without // removing those messages from the stream that will be forwarded to // the backend server. - Peek(ctx context.Context, n uint) (frames [][]byte, _ error) + Peek() (frame []byte, _ error) + + // Modify replaces the peeked payload in the stream with the provided frame. + // If no payload was peeked, an error will be returned. + // Note: Modify cannot be called after the director returns. + Modify(payload []byte) error } type partialStream struct { @@ -37,7 +42,24 @@ func newPeeker(stream grpc.ServerStream) *peeker { // peek quanity var ErrInvalidPeekCount = errors.New("peek count must be greater than zero") -func (p peeker) Peek(ctx context.Context, n uint) ([][]byte, error) { +func (p peeker) Peek() ([]byte, error) { + payloads, err := p.peek(1) + if err != nil { + return nil, err + } + + if len(payloads) != 1 { + return nil, errors.New("failed to peek 1 message") + } + + return payloads[0], nil +} + +func (p peeker) Modify(payload []byte) error { + return p.modify([][]byte{payload}) +} + +func (p peeker) peek(n uint) ([][]byte, error) { if n < 1 { return nil, ErrInvalidPeekCount } @@ -57,3 +79,15 @@ func (p peeker) Peek(ctx context.Context, n uint) ([][]byte, error) { return peekedFrames, nil } + +func (p peeker) modify(payloads [][]byte) error { + if len(payloads) != len(p.consumedStream.frames) { + return fmt.Errorf("replacement frames count %d does not match consumed frames count %d", len(payloads), len(p.consumedStream.frames)) + } + + for i, payload := range payloads { + p.consumedStream.frames[i].payload = payload + } + + return nil +} diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go index 3e76f16b8..e274f31e2 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker_test.go +++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go @@ -28,15 +28,14 @@ func TestStreamPeeking(t *testing.T) { pingReqSent := &testservice.PingRequest{Value: "hi"} // director will peek into stream before routing traffic - director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { t.Logf("director routing method %s to backend", fullMethodName) - peekedMsgs, err := peeker.Peek(ctx, 1) + peekedMsg, err := peeker.Peek() require.NoError(t, err) - require.Len(t, peekedMsgs, 1) peekedRequest := new(testservice.PingRequest) - err = proto.Unmarshal(peekedMsgs[0], peekedRequest) + err = proto.Unmarshal(peekedMsg, peekedRequest) require.NoError(t, err) require.Equal(t, pingReqSent, peekedRequest) @@ -74,5 +73,70 @@ func TestStreamPeeking(t *testing.T) { require.Equal(t, resp, pingResp) _, err = proxyClientPingStream.Recv() - require.Error(t, err, io.EOF) + require.Error(t, io.EOF, err) +} + +func TestStreamInjecting(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + backendCC, backendSrvr, cleanupPinger := newBackendPinger(t, ctx) + defer cleanupPinger() + + pingReqSent := &testservice.PingRequest{Value: "hi"} + newValue := "bye" + + // director will peek into stream and change some frames + director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { + t.Logf("modifying request for method %s", fullMethodName) + + peekedMsg, err := peeker.Peek() + require.NoError(t, err) + + peekedRequest := new(testservice.PingRequest) + require.NoError(t, proto.Unmarshal(peekedMsg, peekedRequest)) + require.Equal(t, "hi", peekedRequest.GetValue()) + + peekedRequest.Value = newValue + + newPayload, err := proto.Marshal(peekedRequest) + require.NoError(t, err) + + require.NoError(t, peeker.Modify(newPayload)) + + return ctx, backendCC, nil + } + + pingResp := &testservice.PingResponse{ + Counter: 1, + } + + // we expect the backend server to receive the modified message + backendSrvr.pingStream = func(stream testservice.TestService_PingStreamServer) error { + pingReqReceived, err := stream.Recv() + assert.NoError(t, err) + assert.Equal(t, newValue, pingReqReceived.GetValue()) + + return stream.Send(pingResp) + } + + proxyCC, cleanupProxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "PingStream") + defer cleanupProxy() + + proxyClient := testservice.NewTestServiceClient(proxyCC) + + proxyClientPingStream, err := proxyClient.PingStream(ctx) + require.NoError(t, err) + defer proxyClientPingStream.CloseSend() + + require.NoError(t, + proxyClientPingStream.Send(pingReqSent), + ) + + resp, err := proxyClientPingStream.Recv() + require.NoError(t, err) + require.Equal(t, resp, pingResp) + + _, err = proxyClientPingStream.Recv() + require.Error(t, io.EOF, err) } |