Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2019-07-31 19:05:33 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-07-31 19:05:33 +0300
commit1ab045ca8712df495fd7bafb544893ed04ea44a7 (patch)
tree852416f42b3baab419f996f1013f83003ba5425b
parent71fa6ae18aaab3dacf016bc24b295e512356bf01 (diff)
parentb405915a7cbd496c8e60c881463354f833a520be (diff)
Merge branch 'jc-replace-frames' into 'master'
Add capability to replace frames in a stream Closes #1804 See merge request gitlab-org/gitaly!1382
-rw-r--r--changelogs/unreleased/jc-replace-frames.yml5
-rw-r--r--internal/praefect/coordinator.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/examples_test.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker.go46
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker_test.go74
7 files changed, 118 insertions, 15 deletions
diff --git a/changelogs/unreleased/jc-replace-frames.yml b/changelogs/unreleased/jc-replace-frames.yml
new file mode 100644
index 000000000..e8261d283
--- /dev/null
+++ b/changelogs/unreleased/jc-replace-frames.yml
@@ -0,0 +1,5 @@
+---
+title: Add capability to replace certain frames in a stream
+merge_request: 1382
+author:
+type: added
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 8f64022cb..c238604f3 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -69,7 +69,7 @@ func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
}
// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 37f2be2d1..10a63b228 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -21,4 +21,4 @@ import (
// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
//
// See the rather rich example.
-type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamPeeker) (context.Context, *grpc.ClientConn, error)
+type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, error)
diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go
index e312f3a5c..2c2090363 100644
--- a/internal/praefect/grpc-proxy/proxy/examples_test.go
+++ b/internal/praefect/grpc-proxy/proxy/examples_test.go
@@ -39,7 +39,7 @@ func ExampleTransparentHandler() {
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
func ExampleStreamDirector() {
- director = func(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
+ director = func(ctx context.Context, fullMethodName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
// Make sure we never forward internal services.
if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
return nil, nil, grpc.Errorf(codes.Unimplemented, "Unknown method")
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index 0a4fabd20..0fff36ed4 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -207,7 +207,7 @@ func (s *ProxyHappySuite) SetupSuite() {
// Setup of the proxy's Director.
s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec()))
require.NoError(s.T(), err, "must not error on deferred client Dial")
- director := func(ctx context.Context, fullName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullName string, _ proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[rejectingMdKey]; exists {
diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go
index 69b41f672..1d1e02df5 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker.go
@@ -2,18 +2,23 @@ package proxy
import (
"errors"
+ "fmt"
- "golang.org/x/net/context"
"google.golang.org/grpc"
)
-// StreamPeeker abstracts away the gRPC stream being forwarded so that it can
+// StreamModifier abstracts away the gRPC stream being forwarded so that it can
// be inspected and modified.
-type StreamPeeker interface {
- // Peek allows a director to peak n-messages into the stream without
+type StreamModifier interface {
+ // Peek allows a director to peek one message into the request stream without
// removing those messages from the stream that will be forwarded to
// the backend server.
- Peek(ctx context.Context, n uint) (frames [][]byte, _ error)
+ Peek() (frame []byte, _ error)
+
+ // Modify replaces the peeked payload in the stream with the provided frame.
+ // If no payload was peeked, an error will be returned.
+ // Note: Modify cannot be called after the director returns.
+ Modify(payload []byte) error
}
type partialStream struct {
@@ -37,7 +42,24 @@ func newPeeker(stream grpc.ServerStream) *peeker {
// peek quanity
var ErrInvalidPeekCount = errors.New("peek count must be greater than zero")
-func (p peeker) Peek(ctx context.Context, n uint) ([][]byte, error) {
+func (p peeker) Peek() ([]byte, error) {
+ payloads, err := p.peek(1)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(payloads) != 1 {
+ return nil, errors.New("failed to peek 1 message")
+ }
+
+ return payloads[0], nil
+}
+
+func (p peeker) Modify(payload []byte) error {
+ return p.modify([][]byte{payload})
+}
+
+func (p peeker) peek(n uint) ([][]byte, error) {
if n < 1 {
return nil, ErrInvalidPeekCount
}
@@ -57,3 +79,15 @@ func (p peeker) Peek(ctx context.Context, n uint) ([][]byte, error) {
return peekedFrames, nil
}
+
+func (p peeker) modify(payloads [][]byte) error {
+ if len(payloads) != len(p.consumedStream.frames) {
+ return fmt.Errorf("replacement frames count %d does not match consumed frames count %d", len(payloads), len(p.consumedStream.frames))
+ }
+
+ for i, payload := range payloads {
+ p.consumedStream.frames[i].payload = payload
+ }
+
+ return nil
+}
diff --git a/internal/praefect/grpc-proxy/proxy/peeker_test.go b/internal/praefect/grpc-proxy/proxy/peeker_test.go
index 3e76f16b8..e274f31e2 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker_test.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker_test.go
@@ -28,15 +28,14 @@ func TestStreamPeeking(t *testing.T) {
pingReqSent := &testservice.PingRequest{Value: "hi"}
// director will peek into stream before routing traffic
- director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
t.Logf("director routing method %s to backend", fullMethodName)
- peekedMsgs, err := peeker.Peek(ctx, 1)
+ peekedMsg, err := peeker.Peek()
require.NoError(t, err)
- require.Len(t, peekedMsgs, 1)
peekedRequest := new(testservice.PingRequest)
- err = proto.Unmarshal(peekedMsgs[0], peekedRequest)
+ err = proto.Unmarshal(peekedMsg, peekedRequest)
require.NoError(t, err)
require.Equal(t, pingReqSent, peekedRequest)
@@ -74,5 +73,70 @@ func TestStreamPeeking(t *testing.T) {
require.Equal(t, resp, pingResp)
_, err = proxyClientPingStream.Recv()
- require.Error(t, err, io.EOF)
+ require.Error(t, io.EOF, err)
+}
+
+func TestStreamInjecting(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ backendCC, backendSrvr, cleanupPinger := newBackendPinger(t, ctx)
+ defer cleanupPinger()
+
+ pingReqSent := &testservice.PingRequest{Value: "hi"}
+ newValue := "bye"
+
+ // director will peek into stream and change some frames
+ director := func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
+ t.Logf("modifying request for method %s", fullMethodName)
+
+ peekedMsg, err := peeker.Peek()
+ require.NoError(t, err)
+
+ peekedRequest := new(testservice.PingRequest)
+ require.NoError(t, proto.Unmarshal(peekedMsg, peekedRequest))
+ require.Equal(t, "hi", peekedRequest.GetValue())
+
+ peekedRequest.Value = newValue
+
+ newPayload, err := proto.Marshal(peekedRequest)
+ require.NoError(t, err)
+
+ require.NoError(t, peeker.Modify(newPayload))
+
+ return ctx, backendCC, nil
+ }
+
+ pingResp := &testservice.PingResponse{
+ Counter: 1,
+ }
+
+ // we expect the backend server to receive the modified message
+ backendSrvr.pingStream = func(stream testservice.TestService_PingStreamServer) error {
+ pingReqReceived, err := stream.Recv()
+ assert.NoError(t, err)
+ assert.Equal(t, newValue, pingReqReceived.GetValue())
+
+ return stream.Send(pingResp)
+ }
+
+ proxyCC, cleanupProxy := newProxy(t, ctx, director, "mwitkow.testproto.TestService", "PingStream")
+ defer cleanupProxy()
+
+ proxyClient := testservice.NewTestServiceClient(proxyCC)
+
+ proxyClientPingStream, err := proxyClient.PingStream(ctx)
+ require.NoError(t, err)
+ defer proxyClientPingStream.CloseSend()
+
+ require.NoError(t,
+ proxyClientPingStream.Send(pingReqSent),
+ )
+
+ resp, err := proxyClientPingStream.Recv()
+ require.NoError(t, err)
+ require.Equal(t, resp, pingResp)
+
+ _, err = proxyClientPingStream.Recv()
+ require.Error(t, io.EOF, err)
}