diff options
author | John Cai <jcai@gitlab.com> | 2020-03-09 23:28:26 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-03-09 23:28:26 +0300 |
commit | c15501dab93e99647c28f295ed3441712a30f510 (patch) | |
tree | fe4ba1226a07b45f0403da113d3649f1edd87b5d | |
parent | c70251f8f1344ef40451f760620670b941761045 (diff) | |
parent | 2de87df9f710c70a2b77ceef8df32c39397afea4 (diff) |
Merge branch 'jc-register-mutator' into 'master'
Add RegisterStreamerHandlers method to register custom stream handlers
See merge request gitlab-org/gitaly!1888
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 28 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler_test.go | 87 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 8 |
3 files changed, 121 insertions, 2 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index ced9388b9..69012235e 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -24,6 +24,34 @@ var ( } ) +// RegisterStreamHandlers sets up stream handlers for a set of gRPC methods for a given service. +// streamers is a map of method to grpc.StreamHandler eg: +// +// streamHandler := func(srv interface{}, stream ServerStream) error { +// /** do some stuff **/ +// return nil +// } +// RegisterStreamHandlers(grpcServer, "MyGrpcService", map[string]grpc.StreamHandler{"Method1": streamHandler}) +// note: multiple calls with the same serviceName will result in a fatal +func RegisterStreamHandlers(server *grpc.Server, serviceName string, streamers map[string]grpc.StreamHandler) { + desc := &grpc.ServiceDesc{ + ServiceName: serviceName, + HandlerType: (*interface{})(nil), + } + + for methodName, streamer := range streamers { + streamDesc := grpc.StreamDesc{ + StreamName: methodName, + Handler: streamer, + ServerStreams: true, + ClientStreams: true, + } + desc.Streams = append(desc.Streams, streamDesc) + } + + server.RegisterService(desc, struct{}{}) +} + // RegisterService sets up a proxy handler for a particular gRPC service and method. // The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. // diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index d3b38e2df..bd8fef5da 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -9,6 +9,7 @@ package proxy_test import ( "context" + "errors" "fmt" "io" "log" @@ -27,10 +28,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" pb "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" @@ -48,6 +51,12 @@ const ( countListResponses = 20 ) +func TestMain(m *testing.M) { + grpclog.SetLogger(log.New(os.Stderr, "grpc: ", log.LstdFlags)) + m.Run() + os.Exit(0) +} + // asserting service is implemented on the server side and serves as a handler for stuff type assertingService struct { t *testing.T @@ -226,8 +235,6 @@ func (s *ProxyHappySuite) SetupSuite() { s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") require.NoError(s.T(), err, "must be able to allocate a port for serverListener") - grpclog.SetLogger(log.New(os.Stderr, "grpc: ", log.LstdFlags)) - s.server = grpc.NewServer() pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) @@ -299,3 +306,79 @@ func (s *ProxyHappySuite) TearDownSuite() { func TestProxyHappySuite(t *testing.T) { suite.Run(t, &ProxyHappySuite{}) } + +func TestRegisterStreamHandlers(t *testing.T) { + directorCalledError := errors.New("director was called") + + server := grpc.NewServer( + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { + return nil, directorCalledError + })), + ) + + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", serverSocketPath) + if err != nil { + t.Fatal(err) + } + + go server.Serve(listener) + defer server.Stop() + + var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool + + pingValue := "hello" + + pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { + pingStreamHandlerCalled = true + var req pb.PingRequest + + if err := stream.RecvMsg(&req); err != nil { + return err + } + + require.Equal(t, pingValue, req.Value) + + return nil + } + + pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error { + pingEmptyStreamHandlerCalled = true + var req pb.Empty + + if err := stream.RecvMsg(&req); err != nil { + return err + } + + return nil + } + + streamers := map[string]grpc.StreamHandler{ + "Ping": pingStreamHandler, + "PingEmpty": pingEmptyStreamHandler, + } + + proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers) + + cc, err := client.Dial("unix://"+serverSocketPath, nil) + require.NoError(t, err) + + testServiceClient := pb.NewTestServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue}) + require.False(t, testhelper.GrpcErrorHasMessage(err, directorCalledError.Error())) + require.True(t, pingStreamHandlerCalled) + + _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{}) + require.False(t, testhelper.GrpcErrorHasMessage(err, directorCalledError.Error())) + require.True(t, pingEmptyStreamHandlerCalled) + + // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler + _, err = testServiceClient.PingError(ctx, &pb.PingRequest{}) + require.True(t, testhelper.GrpcErrorHasMessage(err, directorCalledError.Error())) +} diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index a052c5119..5d7d0647a 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -590,3 +590,11 @@ func getGitDirSize(t *testing.T, repoPath string, subdirs ...string) int64 { return blocks } + +func GrpcErrorHasMessage(grpcError error, msg string) bool { + status, ok := status.FromError(grpcError) + if !ok { + return false + } + return status.Message() == msg +} |