diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2021-10-11 07:42:21 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2021-10-12 10:01:35 +0300 |
commit | b9de9e7ee0e2a8f3adb50be82f28a41a426afebc (patch) | |
tree | bb71ffd6dd8c9d3c152577cb5c430d0e868723bc | |
parent | 3749f8836eeb67564b76cc71f7d8155766aeb3ba (diff) |
Return Canceled gRPC code if Sidechannel client hangs up1302-make-sure-server-returns-canceled-grpc-code-if-sidechannel-client-hangs-up-after-sending-a
Issue: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1302
Changelog: fixed
-rw-r--r-- | client/dial.go | 4 | ||||
-rw-r--r-- | internal/backchannel/backchannel.go | 18 | ||||
-rw-r--r-- | internal/backchannel/client.go | 2 | ||||
-rw-r--r-- | internal/backchannel/server.go | 2 | ||||
-rw-r--r-- | internal/middleware/cancelhandler/cancelhandler.go | 21 | ||||
-rw-r--r-- | internal/sidechannel/sidechannel.go | 28 | ||||
-rw-r--r-- | internal/sidechannel/sidechannel_test.go | 54 |
7 files changed, 97 insertions, 32 deletions
diff --git a/client/dial.go b/client/dial.go index cedb075fd..352003d09 100644 --- a/client/dial.go +++ b/client/dial.go @@ -35,8 +35,8 @@ func Dial(rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, erro // backchannel connection instead of a regular gRPC connection. It also // injects sr as a sidechannel registry, so that Gitaly can establish // sidechannels back to the client. -func DialSidechannel(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption, opts ...sidechannel.Option) (*grpc.ClientConn, error) { - clientHandshaker := sidechannel.NewClientHandshaker(sr.logger, sr.registry, opts...) +func DialSidechannel(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption) (*grpc.ClientConn, error) { + clientHandshaker := sidechannel.NewClientHandshaker(sr.logger, sr.registry) return client.Dial(ctx, rawAddress, connOpts, clientHandshaker) } diff --git a/internal/backchannel/backchannel.go b/internal/backchannel/backchannel.go index 1f8d502f6..a7c03e6c8 100644 --- a/internal/backchannel/backchannel.go +++ b/internal/backchannel/backchannel.go @@ -48,15 +48,19 @@ type connCloser struct { // Close calls the provided close function. func (cc connCloser) Close() error { return cc.close() } -// Options stores the configurations used in backchannel -type Options struct { - YamuxConfig *yamux.Config +type options struct { + yamuxConfig *yamux.Config } // A Option sets options such as yamux configurations for backchannel -type Option func(*Options) +type Option func(*options) -func defaultBackchannelOptions(logger io.Writer) *Options { +// WithYamuxConfig customizes the yamux configuration used in backchannel +func WithYamuxConfig(yamuxConfig *yamux.Config) Option { + return func(opts *options) { opts.yamuxConfig = yamuxConfig } +} + +func defaultBackchannelOptions(logger io.Writer) *options { yamuxConf := yamux.DefaultConfig() // The server only accepts a single stream from the client, which is the client's gRPC stream. // The backchannel server should only receive a single stream from the server. As such, we can @@ -75,7 +79,7 @@ func defaultBackchannelOptions(logger io.Writer) *Options { yamuxConf.LogOutput = logger - return &Options{ - YamuxConfig: yamuxConf, + return &options{ + yamuxConfig: yamuxConf, } } diff --git a/internal/backchannel/client.go b/internal/backchannel/client.go index cf635cd63..5af88cd0c 100644 --- a/internal/backchannel/client.go +++ b/internal/backchannel/client.go @@ -98,7 +98,7 @@ func (ch clientHandshake) serve(ctx context.Context, conn net.Conn) (net.Conn, e } // Initiate the multiplexing session. - muxSession, err := yamux.Client(conn, options.YamuxConfig) + muxSession, err := yamux.Client(conn, options.yamuxConfig) if err != nil { logger.Close() return nil, fmt.Errorf("open multiplexing session: %w", err) diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go index c6a6a6c25..378b1a847 100644 --- a/internal/backchannel/server.go +++ b/internal/backchannel/server.go @@ -105,7 +105,7 @@ func (s *ServerHandshaker) Handshake(conn net.Conn, authInfo credentials.AuthInf } // Open the server side of the multiplexing session. - muxSession, err := yamux.Server(conn, options.YamuxConfig) + muxSession, err := yamux.Server(conn, options.yamuxConfig) if err != nil { logger.Close() return nil, nil, fmt.Errorf("create multiplexing session: %w", err) diff --git a/internal/middleware/cancelhandler/cancelhandler.go b/internal/middleware/cancelhandler/cancelhandler.go index 8ee870522..d8f5f4c53 100644 --- a/internal/middleware/cancelhandler/cancelhandler.go +++ b/internal/middleware/cancelhandler/cancelhandler.go @@ -2,7 +2,9 @@ package cancelhandler import ( "context" + "errors" + "github.com/hashicorp/yamux" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,7 +24,24 @@ func Stream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerIn } func wrapErr(ctx context.Context, err error) error { - if err == nil || ctx.Err() == nil { + if err == nil { + return err + } + + // The gRPC may not be aware that sidechannel clients hang up. Therefore, + // we have to handle yamux errors here. There are two errors for this scenario: + // - The client called Close(). A flagFIN is sent to the server. + // + The server starts force close timer, but is still able to write + // + The client starts a force close timer + // - When the timers are due, any read/write operations raise + // ErrStreamClosed. They send flagRST flag to the other side. + // - When either side receives flagRST, any read/write operations raise + // ErrConnectionReset + if errors.Is(err, yamux.ErrStreamClosed) || errors.Is(err, yamux.ErrConnectionReset) { + return status.Errorf(codes.Canceled, "%v", err) + } + + if ctx.Err() == nil { return err } diff --git a/internal/sidechannel/sidechannel.go b/internal/sidechannel/sidechannel.go index d64407df1..56f517e0a 100644 --- a/internal/sidechannel/sidechannel.go +++ b/internal/sidechannel/sidechannel.go @@ -31,15 +31,11 @@ const ( sidechannelMetadataKey = "gitaly-sidechannel-id" ) -// Options stores the configurations used in backchannel -type Options struct { - YamuxConfig *yamux.Config +type options struct { + yamuxConfig *yamux.Config } -// A Option sets options such as yamux configurations for sidechannel -type Option func(*Options) - -func defaultSidechannelOptions(logger io.Writer) *Options { +func defaultSidechannelOptions(logger io.Writer) *options { yamuxConf := yamux.DefaultConfig() // At the moment, those configurations are the subset of backchannel yamux @@ -49,11 +45,19 @@ func defaultSidechannelOptions(logger io.Writer) *Options { yamuxConf.EnableKeepAlive = false yamuxConf.LogOutput = logger - return &Options{ - YamuxConfig: yamuxConf, + return &options{ + yamuxConfig: yamuxConf, } } +// A Option sets options such as yamux configurations for sidechannel +type Option func(*options) + +// WithYamuxConfig customizes the yamux configuration used in sidechannel +func WithYamuxConfig(yamuxConfig *yamux.Config) Option { + return func(opts *options) { opts.yamuxConfig = yamuxConfig } +} + // OpenSidechannel opens a sidechannel connection from the stream opener // extracted from the current peer connection. func OpenSidechannel(ctx context.Context) (_ *ServerConn, err error) { @@ -169,10 +173,6 @@ func NewClientHandshaker(logger *logrus.Entry, registry *Registry, opts ...Optio lm.Register(NewServerHandshaker(registry)) return grpc.NewServer(grpc.Creds(lm)) }, - []backchannel.Option{ - func(options *backchannel.Options) { - options.YamuxConfig = sidechannelOpts.YamuxConfig - }, - }..., + backchannel.WithYamuxConfig(sidechannelOpts.yamuxConfig), ) } diff --git a/internal/sidechannel/sidechannel_test.go b/internal/sidechannel/sidechannel_test.go index 034f62b2e..d1c80922f 100644 --- a/internal/sidechannel/sidechannel_test.go +++ b/internal/sidechannel/sidechannel_test.go @@ -148,7 +148,47 @@ func TestSidechannelConcurrency(t *testing.T) { } } -func startServer(t *testing.T, th testHandler, opts ...grpc.ServerOption) string { +func TestSidechannelCancelled(t *testing.T) { + addr := startServer( + t, + func(context context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { + conn, err := OpenSidechannel(context) + if err != nil { + return nil, err + } + defer conn.Close() + + if _, err := io.Copy(io.Discard, conn); err != nil { + return nil, err + } + + responseData := make([]byte, 64*1024) + for { + // Write into yamux connection until reaching error. + if _, err = conn.Write(responseData); err != nil { + return nil, err + } + } + }, + []grpc.ServerOption{ + grpc.UnaryInterceptor(cancelhandler.Unary), + grpc.StreamInterceptor(cancelhandler.Stream), + }, + withYamuxCfgFastTimeout(), + ) + + conn, registry := dial(t, addr, withYamuxCfgFastTimeout()) + client := healthpb.NewHealthClient(conn) + + ctxOut, waiter := RegisterSidechannel(context.Background(), registry, func(conn *ClientConn) error { + // Send data to the server but not wait for the response + return conn.CloseWrite() + }) + defer waiter.Close() + + _, err := client.Check(ctxOut, &healthpb.HealthCheckRequest{}) + testhelper.RequireGrpcError(t, err, codes.Canceled) +} func startServer(t *testing.T, th testHandler, grpcOpts []grpc.ServerOption, sidechannelOpts ...Option) string { t.Helper() @@ -159,11 +199,7 @@ func startServer(t *testing.T, th testHandler, grpcOpts []grpc.ServerOption, sid } lm := listenmux.New(insecure.NewCredentials()) - lm.Register(backchannel.NewServerHandshaker(newLogger(), backchannel.NewRegistry(), nil, []backchannel.Option{ - func(backchannelOptions *backchannel.Options) { - backchannelOptions.YamuxConfig = options.YamuxConfig - }, - }...)) + lm.Register(backchannel.NewServerHandshaker(newLogger(), backchannel.NewRegistry(), nil, backchannel.WithYamuxConfig(options.yamuxConfig))) grpcOpts = append(grpcOpts, grpc.Creds(lm)) @@ -221,3 +257,9 @@ type server struct { func (s *server) Check(context context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { return s.testHandler(context, request) } + +func withYamuxCfgFastTimeout() Option { + return func(options *options) { + options.yamuxConfig.StreamCloseTimeout = 10 * time.Millisecond + } +} |