Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/gitaly/server/server_factory_test.go')
-rw-r--r--internal/gitaly/server/server_factory_test.go180
1 files changed, 171 insertions, 9 deletions
diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go
index 78ef98747..7f625946a 100644
--- a/internal/gitaly/server/server_factory_test.go
+++ b/internal/gitaly/server/server_factory_test.go
@@ -1,17 +1,20 @@
package server
import (
+ "context"
"crypto/tls"
"crypto/x509"
- "io/ioutil"
+ "errors"
"net"
"os"
"testing"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/internal/bootstrap/starter"
+ "gitlab.com/gitlab-org/gitaly/internal/cache"
"gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg"
@@ -36,7 +39,7 @@ func TestGitalyServerFactory(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { listener.Close() })
- srv, err := sf.Create(true)
+ srv, err := sf.CreateExternal(true)
require.NoError(t, err)
healthpb.RegisterHealthServer(srv, health.NewServer())
go srv.Serve(listener)
@@ -44,9 +47,7 @@ func TestGitalyServerFactory(t *testing.T) {
certPool, err := x509.SystemCertPool()
require.NoError(t, err)
- pem, err := ioutil.ReadFile(sf.cfg.TLS.CertPath)
- require.NoError(t, err)
-
+ pem := testhelper.MustReadFile(t, sf.cfg.TLS.CertPath)
require.True(t, certPool.AppendCertsFromPEM(pem))
creds := credentials.NewTLS(&tls.Config{
@@ -61,7 +62,7 @@ func TestGitalyServerFactory(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { listener.Close() })
- srv, err := sf.Create(false)
+ srv, err := sf.CreateExternal(false)
require.NoError(t, err)
healthpb.RegisterHealthServer(srv, health.NewServer())
go srv.Serve(listener)
@@ -85,7 +86,7 @@ func TestGitalyServerFactory(t *testing.T) {
t.Run("insecure", func(t *testing.T) {
cfg := testcfg.Build(t)
- sf := NewGitalyServerFactory(cfg, backchannel.NewRegistry())
+ sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
checkHealth(t, sf, starter.TCP, "localhost:0")
})
@@ -98,7 +99,7 @@ func TestGitalyServerFactory(t *testing.T) {
KeyPath: keyFile,
}}))
- sf := NewGitalyServerFactory(cfg, backchannel.NewRegistry())
+ sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
t.Cleanup(sf.Stop)
checkHealth(t, sf, starter.TLS, "localhost:0")
@@ -106,7 +107,7 @@ func TestGitalyServerFactory(t *testing.T) {
t.Run("all services must be stopped", func(t *testing.T) {
cfg := testcfg.Build(t)
- sf := NewGitalyServerFactory(cfg, backchannel.NewRegistry())
+ sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
t.Cleanup(sf.Stop)
tcpHealthClient := checkHealth(t, sf, starter.TCP, "localhost:0")
@@ -125,3 +126,164 @@ func TestGitalyServerFactory(t *testing.T) {
require.Equal(t, codes.Unavailable, status.Code(socketErr))
})
}
+
+func TestGitalyServerFactory_closeOrder(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ cfg := testcfg.Build(t)
+ sf := NewGitalyServerFactory(cfg, testhelper.DiscardTestEntry(t), backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg)))
+ defer sf.Stop()
+
+ errQuickRPC := status.Error(codes.Internal, "quick RPC")
+ errBlockingRPC := status.Error(codes.Internal, "blocking RPC")
+
+ invokeQuick := func(conn *grpc.ClientConn, shouldSucceed bool) {
+ err := conn.Invoke(ctx, "/Service/Quick", &healthpb.HealthCheckRequest{}, &healthpb.HealthCheckRequest{})
+ if !shouldSucceed {
+ testhelper.RequireGrpcError(t, err, codes.Unavailable)
+ return
+ }
+
+ require.Equal(t, errQuickRPC, err)
+ }
+
+ invokeBlocking := func(conn *grpc.ClientConn) chan struct{} {
+ rpcFinished := make(chan struct{})
+
+ go func() {
+ defer close(rpcFinished)
+ assert.Equal(t,
+ errBlockingRPC,
+ conn.Invoke(ctx, "/Service/Blocking", &healthpb.HealthCheckRequest{}, &healthpb.HealthCheckRequest{}),
+ )
+ }()
+
+ return rpcFinished
+ }
+
+ waitUntilFailure := func(conn *grpc.ClientConn) {
+ for {
+ err := conn.Invoke(ctx, "/Service/Quick", &healthpb.HealthCheckRequest{}, &healthpb.HealthCheckRequest{})
+ if errors.Is(err, errQuickRPC) {
+ continue
+ }
+
+ testhelper.RequireGrpcError(t, err, codes.Unavailable)
+ break
+ }
+ }
+
+ var internalConn, externalConn *grpc.ClientConn
+ var internalIsBlocking, externalIsBlocking chan struct{}
+ var releaseInternalBlock, releaseExternalBlock chan struct{}
+ for _, builder := range []struct {
+ createServer func() *grpc.Server
+ conn **grpc.ClientConn
+ isBlocking *chan struct{}
+ releaseBlock *chan struct{}
+ }{
+ {
+ createServer: func() *grpc.Server {
+ server, err := sf.CreateInternal()
+ require.NoError(t, err)
+ return server
+ },
+ conn: &internalConn,
+ isBlocking: &internalIsBlocking,
+ releaseBlock: &releaseInternalBlock,
+ },
+ {
+ createServer: func() *grpc.Server {
+ server, err := sf.CreateExternal(false)
+ require.NoError(t, err)
+ return server
+ },
+ conn: &externalConn,
+ isBlocking: &externalIsBlocking,
+ releaseBlock: &releaseExternalBlock,
+ },
+ } {
+ server := builder.createServer()
+
+ releaseBlock := make(chan struct{})
+ *builder.releaseBlock = releaseBlock
+
+ isBlocking := make(chan struct{})
+ *builder.isBlocking = isBlocking
+
+ server.RegisterService(&grpc.ServiceDesc{
+ ServiceName: "Service",
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "Quick",
+ Handler: func(interface{}, context.Context, func(interface{}) error, grpc.UnaryServerInterceptor) (interface{}, error) {
+ return nil, errQuickRPC
+ },
+ },
+ {
+ MethodName: "Blocking",
+ Handler: func(interface{}, context.Context, func(interface{}) error, grpc.UnaryServerInterceptor) (interface{}, error) {
+ close(isBlocking)
+ <-releaseBlock
+ return nil, errBlockingRPC
+ },
+ },
+ },
+ HandlerType: (*interface{})(nil),
+ }, server)
+
+ ln, err := net.Listen("tcp", "localhost:0")
+ require.NoError(t, err)
+ defer ln.Close()
+
+ go server.Serve(ln)
+
+ *builder.conn, err = grpc.DialContext(ctx, ln.Addr().String(), grpc.WithInsecure())
+ require.NoError(t, err)
+ }
+
+ // both servers should be up and accepting RPCs
+ invokeQuick(externalConn, true)
+ invokeQuick(internalConn, true)
+
+ // invoke a blocking RPC on the external server to block the graceful shutdown
+ invokeBlocking(externalConn)
+ <-externalIsBlocking
+
+ shutdownCompeleted := make(chan struct{})
+ go func() {
+ defer close(shutdownCompeleted)
+ sf.GracefulStop()
+ }()
+
+ // wait until the graceful shutdown is in progress and new RPCs are no longer accepted on the
+ // external servers
+ waitUntilFailure(externalConn)
+
+ // internal sockets should still accept RPCs even if external sockets are gracefully closing.
+ invokeQuick(internalConn, true)
+
+ // block on the internal server
+ internalBlockingRPCFinished := invokeBlocking(internalConn)
+ <-internalIsBlocking
+
+ // release the external server's blocking RPC so the graceful shutdown can complete and proceed to
+ // shutting down the internal servers.
+ close(releaseExternalBlock)
+
+ // wait until the graceful shutdown is in progress and new RPCs are no longer accepted on the internal
+ // servers
+ waitUntilFailure(internalConn)
+
+ // neither internal nor external servers should be accepting new RPCs anymore
+ invokeQuick(externalConn, false)
+ invokeQuick(internalConn, false)
+
+ // wait until the blocking rpc has successfully completed
+ close(releaseInternalBlock)
+ <-internalBlockingRPCFinished
+
+ // wait until the graceful shutdown completes
+ <-shutdownCompeleted
+}