diff options
Diffstat (limited to 'internal/gitaly/server/server_factory_test.go')
-rw-r--r-- | internal/gitaly/server/server_factory_test.go | 180 |
1 files changed, 171 insertions, 9 deletions
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 78ef98747..7f625946a 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -1,17 +1,20 @@ package server import ( + "context" "crypto/tls" "crypto/x509" - "io/ioutil" + "errors" "net" "os" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/backchannel" "gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter" + "gitlab.com/gitlab-org/gitaly/internal/cache" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg" @@ -36,7 +39,7 @@ func TestGitalyServerFactory(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { listener.Close() }) - srv, err := sf.Create(true) + srv, err := sf.CreateExternal(true) require.NoError(t, err) healthpb.RegisterHealthServer(srv, health.NewServer()) go srv.Serve(listener) @@ -44,9 +47,7 @@ func TestGitalyServerFactory(t *testing.T) { certPool, err := x509.SystemCertPool() require.NoError(t, err) - pem, err := ioutil.ReadFile(sf.cfg.TLS.CertPath) - require.NoError(t, err) - + pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath) require.True(t, certPool.AppendCertsFromPEM(pem)) creds := credentials.NewTLS(&tls.Config{ @@ -61,7 +62,7 @@ func TestGitalyServerFactory(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { listener.Close() }) - srv, err := sf.Create(false) + srv, err := sf.CreateExternal(false) require.NoError(t, err) healthpb.RegisterHealthServer(srv, health.NewServer()) go srv.Serve(listener) @@ -85,7 +86,7 @@ func TestGitalyServerFactory(t *testing.T) { t.Run("insecure", func(t *testing.T) { cfg := testcfg.Build(t) - sf := NewGitalyServerFactory(cfg, backchannel.NewRegistry()) + sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) checkHealth(t, sf, starter.TCP, "localhost:0") }) @@ -98,7 +99,7 @@ func TestGitalyServerFactory(t *testing.T) { KeyPath: keyFile, }})) - sf := NewGitalyServerFactory(cfg, backchannel.NewRegistry()) + sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) checkHealth(t, sf, starter.TLS, "localhost:0") @@ -106,7 +107,7 @@ func TestGitalyServerFactory(t *testing.T) { t.Run("all services must be stopped", func(t *testing.T) { cfg := testcfg.Build(t) - sf := NewGitalyServerFactory(cfg, backchannel.NewRegistry()) + sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) t.Cleanup(sf.Stop) tcpHealthClient := checkHealth(t, sf, starter.TCP, "localhost:0") @@ -125,3 +126,164 @@ func TestGitalyServerFactory(t *testing.T) { require.Equal(t, codes.Unavailable, status.Code(socketErr)) }) } + +func TestGitalyServerFactory_closeOrder(t *testing.T) { + ctx, cancel := testhelper.Context() + defer cancel() + + cfg := testcfg.Build(t) + sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg))) + defer sf.Stop() + + errQuickRPC := status.Error(codes.Internal, "quick RPC") + errBlockingRPC := status.Error(codes.Internal, "blocking RPC") + + invokeQuick := func(conn *grpc.ClientConn, shouldSucceed bool) { + err := conn.Invoke(ctx, "/Service/Quick", &healthpb.HealthCheckRequest{}, &healthpb.HealthCheckRequest{}) + if !shouldSucceed { + testhelper.RequireGrpcError(t, err, codes.Unavailable) + return + } + + require.Equal(t, errQuickRPC, err) + } + + invokeBlocking := func(conn *grpc.ClientConn) chan struct{} { + rpcFinished := make(chan struct{}) + + go func() { + defer close(rpcFinished) + assert.Equal(t, + errBlockingRPC, + conn.Invoke(ctx, "/Service/Blocking", &healthpb.HealthCheckRequest{}, &healthpb.HealthCheckRequest{}), + ) + }() + + return rpcFinished + } + + waitUntilFailure := func(conn *grpc.ClientConn) { + for { + err := conn.Invoke(ctx, "/Service/Quick", &healthpb.HealthCheckRequest{}, &healthpb.HealthCheckRequest{}) + if errors.Is(err, errQuickRPC) { + continue + } + + testhelper.RequireGrpcError(t, err, codes.Unavailable) + break + } + } + + var internalConn, externalConn *grpc.ClientConn + var internalIsBlocking, externalIsBlocking chan struct{} + var releaseInternalBlock, releaseExternalBlock chan struct{} + for _, builder := range []struct { + createServer func() *grpc.Server + conn **grpc.ClientConn + isBlocking *chan struct{} + releaseBlock *chan struct{} + }{ + { + createServer: func() *grpc.Server { + server, err := sf.CreateInternal() + require.NoError(t, err) + return server + }, + conn: &internalConn, + isBlocking: &internalIsBlocking, + releaseBlock: &releaseInternalBlock, + }, + { + createServer: func() *grpc.Server { + server, err := sf.CreateExternal(false) + require.NoError(t, err) + return server + }, + conn: &externalConn, + isBlocking: &externalIsBlocking, + releaseBlock: &releaseExternalBlock, + }, + } { + server := builder.createServer() + + releaseBlock := make(chan struct{}) + *builder.releaseBlock = releaseBlock + + isBlocking := make(chan struct{}) + *builder.isBlocking = isBlocking + + server.RegisterService(&grpc.ServiceDesc{ + ServiceName: "Service", + Methods: []grpc.MethodDesc{ + { + MethodName: "Quick", + Handler: func(interface{}, context.Context, func(interface{}) error, grpc.UnaryServerInterceptor) (interface{}, error) { + return nil, errQuickRPC + }, + }, + { + MethodName: "Blocking", + Handler: func(interface{}, context.Context, func(interface{}) error, grpc.UnaryServerInterceptor) (interface{}, error) { + close(isBlocking) + <-releaseBlock + return nil, errBlockingRPC + }, + }, + }, + HandlerType: (*interface{})(nil), + }, server) + + ln, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + defer ln.Close() + + go server.Serve(ln) + + *builder.conn, err = grpc.DialContext(ctx, ln.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + } + + // both servers should be up and accepting RPCs + invokeQuick(externalConn, true) + invokeQuick(internalConn, true) + + // invoke a blocking RPC on the external server to block the graceful shutdown + invokeBlocking(externalConn) + <-externalIsBlocking + + shutdownCompeleted := make(chan struct{}) + go func() { + defer close(shutdownCompeleted) + sf.GracefulStop() + }() + + // wait until the graceful shutdown is in progress and new RPCs are no longer accepted on the + // external servers + waitUntilFailure(externalConn) + + // internal sockets should still accept RPCs even if external sockets are gracefully closing. + invokeQuick(internalConn, true) + + // block on the internal server + internalBlockingRPCFinished := invokeBlocking(internalConn) + <-internalIsBlocking + + // release the external server's blocking RPC so the graceful shutdown can complete and proceed to + // shutting down the internal servers. + close(releaseExternalBlock) + + // wait until the graceful shutdown is in progress and new RPCs are no longer accepted on the internal + // servers + waitUntilFailure(internalConn) + + // neither internal nor external servers should be accepting new RPCs anymore + invokeQuick(externalConn, false) + invokeQuick(internalConn, false) + + // wait until the blocking rpc has successfully completed + close(releaseInternalBlock) + <-internalBlockingRPCFinished + + // wait until the graceful shutdown completes + <-shutdownCompeleted +} |