Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-01-05 15:45:19 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2023-01-10 10:14:12 +0300
commit630717dceb2f9648b6e983f0c6e25c02604edcc5 (patch)
treee564d383b73b6e6966703214289aad2dea569203
parent522c0d9283f757583fcfd43e66b4d5b323503bc0 (diff)
WIP: acknowledge streaming RPC if Praefect rejects requestqmnguyen0711/4249-streaming-rpcs-return-hard-to-debug-eof
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go3
-rw-r--r--internal/praefect/server_test.go74
-rw-r--r--internal/testhelper/testserver/gitaly.go8
-rw-r--r--internal/testhelper/testserver/praefect.go6
4 files changed, 84 insertions, 7 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index b0ca206ca..4fb027dc7 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
"io"
"gitlab.com/gitlab-org/gitaly/v15/internal/middleware/sentryhandler"
@@ -121,6 +122,8 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
// We require that the director's returned context inherits from the serverStream.Context().
params, err := s.director(serverStream.Context(), fullMethodName, peeker)
if err != nil {
+ _ = serverStream.SendMsg(&gitalypb.PostReceiveHookResponse{})
+ _ = serverStream.SendMsg(&gitalypb.PostUploadPackWithSidechannelResponse{})
return err
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index c71c466fd..626cc7bd6 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -6,6 +6,7 @@ import (
"bytes"
"context"
"errors"
+ "google.golang.org/grpc/health"
"io"
"math/rand"
"net"
@@ -1044,6 +1045,79 @@ func TestErrorThreshold(t *testing.T) {
}
}
+func TestStreamingRPCRejected(t *testing.T) {
+ t.Parallel()
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*config.Node{
+ {
+ Storage: "praefect-internal-0",
+ Address: "tcp://this-does-not-matter",
+ },
+ },
+ },
+ },
+ }
+ ctx := testhelper.Context(t)
+
+ queue := datastore.NewPostgresReplicationEventQueue(testdb.New(t))
+ entry := testhelper.NewDiscardingLogEntry(t)
+
+ rs := datastore.MockRepositoryStore{}
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ require.NoError(t, err)
+ defer nodeMgr.Stop()
+
+ coordinator := NewCoordinator(
+ queue,
+ rs,
+ NewNodeManagerRouter(nodeMgr, rs),
+ transactions.NewManager(conf),
+ conf,
+ protoregistry.GitalyProtoPreregistered,
+ )
+
+ server := grpc.NewServer(
+ grpc.ForceServerCodec(proxy.NewCodec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(coordinator.StreamDirector)),
+ )
+ grpc_health_v1.RegisterHealthServer(server, health.NewServer())
+
+ socket := testhelper.GetTemporaryGitalySocketFileName(t)
+ listener, err := net.Listen("unix", socket)
+ require.NoError(t, err)
+
+ go testhelper.MustServe(t, server, listener)
+ defer server.Stop()
+
+ conn, err := dial("unix://"+socket, []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())})
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, conn)
+
+ client := gitalypb.NewSmartHTTPServiceClient(conn)
+ stream, err := client.PostUploadPack(ctx)
+ require.NoError(t, err)
+
+ testserver.WaitHealthy(t, ctx, "unix://"+socket, "")
+
+ err = stream.Send(&gitalypb.PostUploadPackRequest{Repository: &gitalypb.Repository{
+ StorageName: "praefect-internal-0",
+ RelativePath: "please-do-not-exist",
+ }})
+ require.NoError(t, err)
+
+ var response gitalypb.PostUploadPackWithSidechannelResponse
+ err = stream.RecvMsg(&response)
+ require.NoError(t, err)
+
+ var response2 gitalypb.PostUploadPackResponse
+ err = stream.RecvMsg(&response2)
+ require.NoError(t, err)
+}
+
func newSmartHTTPClient(t *testing.T, serverSocketPath string) (gitalypb.SmartHTTPServiceClient, *grpc.ClientConn) {
t.Helper()
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index d2a377710..0427fe4f1 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -124,8 +124,8 @@ func (gs GitalyServer) Address() string {
return gs.address
}
-// waitHealthy waits until the server hosted at address becomes healthy.
-func waitHealthy(tb testing.TB, ctx context.Context, addr string, authToken string) {
+// WaitHealthy waits until the server hosted at address becomes healthy.
+func WaitHealthy(tb testing.TB, ctx context.Context, addr string, authToken string) {
grpcOpts := []grpc.DialOption{
grpc.WithBlock(),
internalclient.UnaryInterceptor(),
@@ -190,7 +190,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, reg
}()
ctx := testhelper.Context(tb)
- waitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token)
+ WaitHealthy(tb, ctx, "unix://"+internalListener.Addr().String(), cfg.Auth.Token)
}
secure := cfg.TLS.CertPath != "" && cfg.TLS.KeyPath != ""
@@ -227,7 +227,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, rubyServer *rubyserver.Server, reg
}()
ctx := testhelper.Context(tb)
- waitHealthy(tb, ctx, addr, cfg.Auth.Token)
+ WaitHealthy(tb, ctx, addr, cfg.Auth.Token)
return externalServer, addr, gsd.disablePraefect
}
diff --git a/internal/testhelper/testserver/praefect.go b/internal/testhelper/testserver/praefect.go
index 7c77ebf60..e27c14197 100644
--- a/internal/testhelper/testserver/praefect.go
+++ b/internal/testhelper/testserver/praefect.go
@@ -114,7 +114,7 @@ func StartPraefect(tb testing.TB, cfg config.Config) PraefectServer {
cancel()
}()
- // Ensure this runs even if context ends in waitHealthy.
+ // Ensure this runs even if context ends in WaitHealthy.
defer func() {
// Check if the process has exited. This must not happen given that we need it to be
// up in order to connect to it.
@@ -128,14 +128,14 @@ func StartPraefect(tb testing.TB, cfg config.Config) PraefectServer {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
- // Capture Praefect logs when waitHealthy takes too long.
+ // Capture Praefect logs when WaitHealthy takes too long.
require.FailNowf(tb, "Connecting to Praefect exceeded deadline", "%s", stderr.String())
}
default:
}
}()
- waitHealthy(tb, ctx, praefectServer.Address(), cfg.Auth.Token)
+ WaitHealthy(tb, ctx, praefectServer.Address(), cfg.Auth.Token)
return praefectServer
}