diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-01-05 15:45:19 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-01-10 10:14:12 +0300 |
commit | 630717dceb2f9648b6e983f0c6e25c02604edcc5 (patch) | |
tree | e564d383b73b6e6966703214289aad2dea569203 | |
parent | 522c0d9283f757583fcfd43e66b4d5b323503bc0 (diff) |
WIP: acknowledge streaming RPC if Praefect rejects requestqmnguyen0711/4249-streaming-rpcs-return-hard-to-debug-eof
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 3 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 74 | ||||
-rw-r--r-- | internal/testhelper/testserver/gitaly.go | 8 | ||||
-rw-r--r-- | internal/testhelper/testserver/praefect.go | 6 |
4 files changed, 84 insertions, 7 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index b0ca206ca..4fb027dc7 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" "io" "gitlab.com/gitlab-org/gitaly/v15/internal/middleware/sentryhandler" @@ -121,6 +122,8 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina // We require that the director's returned context inherits from the serverStream.Context(). params, err := s.director(serverStream.Context(), fullMethodName, peeker) if err != nil { + _ = serverStream.SendMsg(&gitalypb.PostReceiveHookResponse{}) + _ = serverStream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{}) return err } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index c71c466fd..626cc7bd6 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "errors" + "google.golang.org/grpc/health" "io" "math/rand" "net" @@ -1044,6 +1045,79 @@ func TestErrorThreshold(t *testing.T) { } } +func TestStreamingRPCRejected(t *testing.T) { + t.Parallel() + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*config.Node{ + { + Storage: "praefect-internal-0", + Address: "tcp://this-does-not-matter", + }, + }, + }, + }, + } + ctx := testhelper.Context(t) + + queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t)) + entry := testhelper.NewDiscardingLogEntry(t) + + rs := datastore.MockRepositoryStore{} + nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) + require.NoError(t, err) + defer nodeMgr.Stop() + + coordinator := NewCoordinator( + queue, + rs, + NewNodeManagerRouter(nodeMgr, rs), + transactions.NewManager(conf), + conf, + protoregistry.GitalyProtoPreregistered, + ) + + server := grpc.NewServer( + grpc.ForceServerCodec(proxy.NewCodec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(coordinator.StreamDirector)), + ) + grpc_health_v1.RegisterHealthServer(server, health.NewServer()) + + socket := testhelper.GetTemporaryGitalySocketFileName(t) + listener, err := net.Listen("unix", socket) + require.NoError(t, err) + + go testhelper.MustServe(t, server, listener) + defer server.Stop() + + conn, err := dial("unix://"+socket, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}) + require.NoError(t, err) + defer testhelper.MustClose(t, conn) + + client := gitalypb.NewSmartHTTPServiceClient(conn) + stream, err := client.PostUploadPack(ctx) + require.NoError(t, err) + + testserver.WaitHealthy(t, ctx, "unix://"+socket, "") + + err = stream.Send(&gitalypb.PostUploadPackRequest{Repository: &gitalypb.Repository{ + StorageName: "praefect-internal-0", + RelativePath: "please-do-not-exist", + }}) + require.NoError(t, err) + + var response gitalypb.PostUploadPackWithSidechannelResponse + err = stream.RecvMsg(&response) + require.NoError(t, err) + + var response2 gitalypb.PostUploadPackResponse + err = stream.RecvMsg(&response2) + require.NoError(t, err) +} + func newSmartHTTPClient(t *testing.T, serverSocketPath string) (gitalypb.SmartHTTPServiceClient, *grpc.ClientConn) { t.Helper() diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index d2a377710..0427fe4f1 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -124,8 +124,8 @@ func (gs GitalyServer) Address() string { return gs.address } -// waitHealthy waits until the server hosted at address becomes healthy. -func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken string) { +// WaitHealthy waits until the server hosted at address becomes healthy. +func WaitHealthy(tb testing.TB, ctx context.Context, addr string, authToken string) { grpcOpts := []grpc.DialOption{ grpc.WithBlock(), internalclient.UnaryInterceptor(), @@ -190,7 +190,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, reg }() ctx := testhelper.Context(tb) - waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token) + WaitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token) } secure := cfg.TLS.CertPath != "" && cfg.TLS.KeyPath != "" @@ -227,7 +227,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, reg }() ctx := testhelper.Context(tb) - waitHealthy(tb, ctx, addr, cfg.Auth.Token) + WaitHealthy(tb, ctx, addr, cfg.Auth.Token) return externalServer, addr, gsd.disablePraefect } diff --git a/internal/testhelper/testserver/praefect.go b/internal/testhelper/testserver/praefect.go index 7c77ebf60..e27c14197 100644 --- a/internal/testhelper/testserver/praefect.go +++ b/internal/testhelper/testserver/praefect.go @@ -114,7 +114,7 @@ func StartPraefect(tb testing.TB, cfg config.Config) PraefectServer { cancel() }() - // Ensure this runs even if context ends in waitHealthy. + // Ensure this runs even if context ends in WaitHealthy. defer func() { // Check if the process has exited. This must not happen given that we need it to be // up in order to connect to it. @@ -128,14 +128,14 @@ func StartPraefect(tb testing.TB, cfg config.Config) PraefectServer { case <-ctx.Done(): switch ctx.Err() { case context.DeadlineExceeded: - // Capture Praefect logs when waitHealthy takes too long. + // Capture Praefect logs when WaitHealthy takes too long. require.FailNowf(tb, "Connecting to Praefect exceeded deadline", "%s", stderr.String()) } default: } }() - waitHealthy(tb, ctx, praefectServer.Address(), cfg.Auth.Token) + WaitHealthy(tb, ctx, praefectServer.Address(), cfg.Auth.Token) return praefectServer } |