Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-03-09 23:28:26 +0300
committerJohn Cai <jcai@gitlab.com>2020-03-09 23:28:26 +0300
commitc15501dab93e99647c28f295ed3441712a30f510 (patch)
treefe4ba1226a07b45f0403da113d3649f1edd87b5d
parentc70251f8f1344ef40451f760620670b941761045 (diff)
parent2de87df9f710c70a2b77ceef8df32c39397afea4 (diff)
Merge branch 'jc-register-mutator' into 'master'
Add RegisterStreamerHandlers method to register custom stream handlers See merge request gitlab-org/gitaly!1888
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go28
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go87
-rw-r--r--internal/testhelper/testhelper.go8
3 files changed, 121 insertions, 2 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index ced9388b9..69012235e 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -24,6 +24,34 @@ var (
}
)
+// RegisterStreamHandlers sets up stream handlers for a set of gRPC methods for a given service.
+// streamers is a map of method to grpc.StreamHandler eg:
+//
+// streamHandler := func(srv interface{}, stream ServerStream) error {
+// /** do some stuff **/
+// return nil
+// }
+// RegisterStreamHandlers(grpcServer, "MyGrpcService", map[string]grpc.StreamHandler{"Method1": streamHandler})
+// note: multiple calls with the same serviceName will result in a fatal
+func RegisterStreamHandlers(server *grpc.Server, serviceName string, streamers map[string]grpc.StreamHandler) {
+ desc := &grpc.ServiceDesc{
+ ServiceName: serviceName,
+ HandlerType: (*interface{})(nil),
+ }
+
+ for methodName, streamer := range streamers {
+ streamDesc := grpc.StreamDesc{
+ StreamName: methodName,
+ Handler: streamer,
+ ServerStreams: true,
+ ClientStreams: true,
+ }
+ desc.Streams = append(desc.Streams, streamDesc)
+ }
+
+ server.RegisterService(desc, struct{}{})
+}
+
// RegisterService sets up a proxy handler for a particular gRPC service and method.
// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index d3b38e2df..bd8fef5da 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -9,6 +9,7 @@ package proxy_test
import (
"context"
+ "errors"
"fmt"
"io"
"log"
@@ -27,10 +28,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
+ "gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
pb "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/testdata"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
@@ -48,6 +51,12 @@ const (
countListResponses = 20
)
+func TestMain(m *testing.M) {
+ grpclog.SetLogger(log.New(os.Stderr, "grpc: ", log.LstdFlags))
+ m.Run()
+ os.Exit(0)
+}
+
// asserting service is implemented on the server side and serves as a handler for stuff
type assertingService struct {
t *testing.T
@@ -226,8 +235,6 @@ func (s *ProxyHappySuite) SetupSuite() {
s.serverListener, err = net.Listen("tcp", "127.0.0.1:0")
require.NoError(s.T(), err, "must be able to allocate a port for serverListener")
- grpclog.SetLogger(log.New(os.Stderr, "grpc: ", log.LstdFlags))
-
s.server = grpc.NewServer()
pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()})
@@ -299,3 +306,79 @@ func (s *ProxyHappySuite) TearDownSuite() {
func TestProxyHappySuite(t *testing.T) {
suite.Run(t, &ProxyHappySuite{})
}
+
+func TestRegisterStreamHandlers(t *testing.T) {
+ directorCalledError := errors.New("director was called")
+
+ server := grpc.NewServer(
+ grpc.CustomCodec(proxy.Codec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(func(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
+ return nil, directorCalledError
+ })),
+ )
+
+ serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
+
+ listener, err := net.Listen("unix", serverSocketPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ go server.Serve(listener)
+ defer server.Stop()
+
+ var pingStreamHandlerCalled, pingEmptyStreamHandlerCalled bool
+
+ pingValue := "hello"
+
+ pingStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
+ pingStreamHandlerCalled = true
+ var req pb.PingRequest
+
+ if err := stream.RecvMsg(&req); err != nil {
+ return err
+ }
+
+ require.Equal(t, pingValue, req.Value)
+
+ return nil
+ }
+
+ pingEmptyStreamHandler := func(srv interface{}, stream grpc.ServerStream) error {
+ pingEmptyStreamHandlerCalled = true
+ var req pb.Empty
+
+ if err := stream.RecvMsg(&req); err != nil {
+ return err
+ }
+
+ return nil
+ }
+
+ streamers := map[string]grpc.StreamHandler{
+ "Ping": pingStreamHandler,
+ "PingEmpty": pingEmptyStreamHandler,
+ }
+
+ proxy.RegisterStreamHandlers(server, "mwitkow.testproto.TestService", streamers)
+
+ cc, err := client.Dial("unix://"+serverSocketPath, nil)
+ require.NoError(t, err)
+
+ testServiceClient := pb.NewTestServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ _, err = testServiceClient.Ping(ctx, &pb.PingRequest{Value: pingValue})
+ require.False(t, testhelper.GrpcErrorHasMessage(err, directorCalledError.Error()))
+ require.True(t, pingStreamHandlerCalled)
+
+ _, err = testServiceClient.PingEmpty(ctx, &pb.Empty{})
+ require.False(t, testhelper.GrpcErrorHasMessage(err, directorCalledError.Error()))
+ require.True(t, pingEmptyStreamHandlerCalled)
+
+ // since PingError was never registered with its own streamer, it should get sent to the UnknownServiceHandler
+ _, err = testServiceClient.PingError(ctx, &pb.PingRequest{})
+ require.True(t, testhelper.GrpcErrorHasMessage(err, directorCalledError.Error()))
+}
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index a052c5119..5d7d0647a 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -590,3 +590,11 @@ func getGitDirSize(t *testing.T, repoPath string, subdirs ...string) int64 {
return blocks
}
+
+func GrpcErrorHasMessage(grpcError error, msg string) bool {
+ status, ok := status.FromError(grpcError)
+ if !ok {
+ return false
+ }
+ return status.Message() == msg
+}