Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuang-Minh Nguyen <qmnguyen@gitlab.com>2021-10-11 07:42:21 +0300
committerQuang-Minh Nguyen <qmnguyen@gitlab.com>2021-10-12 10:01:35 +0300
commitb9de9e7ee0e2a8f3adb50be82f28a41a426afebc (patch)
treebb71ffd6dd8c9d3c152577cb5c430d0e868723bc
parent3749f8836eeb67564b76cc71f7d8155766aeb3ba (diff)
Issue: https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/1302 Changelog: fixed
-rw-r--r--client/dial.go4
-rw-r--r--internal/backchannel/backchannel.go18
-rw-r--r--internal/backchannel/client.go2
-rw-r--r--internal/backchannel/server.go2
-rw-r--r--internal/middleware/cancelhandler/cancelhandler.go21
-rw-r--r--internal/sidechannel/sidechannel.go28
-rw-r--r--internal/sidechannel/sidechannel_test.go54
7 files changed, 97 insertions, 32 deletions
diff --git a/client/dial.go b/client/dial.go
index cedb075fd..352003d09 100644
--- a/client/dial.go
+++ b/client/dial.go
@@ -35,8 +35,8 @@ func Dial(rawAddress string, connOpts []grpc.DialOption) (*grpc.ClientConn, erro
// backchannel connection instead of a regular gRPC connection. It also
// injects sr as a sidechannel registry, so that Gitaly can establish
// sidechannels back to the client.
-func DialSidechannel(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption, opts ...sidechannel.Option) (*grpc.ClientConn, error) {
- clientHandshaker := sidechannel.NewClientHandshaker(sr.logger, sr.registry, opts...)
+func DialSidechannel(ctx context.Context, rawAddress string, sr *SidechannelRegistry, connOpts []grpc.DialOption) (*grpc.ClientConn, error) {
+ clientHandshaker := sidechannel.NewClientHandshaker(sr.logger, sr.registry)
return client.Dial(ctx, rawAddress, connOpts, clientHandshaker)
}
diff --git a/internal/backchannel/backchannel.go b/internal/backchannel/backchannel.go
index 1f8d502f6..a7c03e6c8 100644
--- a/internal/backchannel/backchannel.go
+++ b/internal/backchannel/backchannel.go
@@ -48,15 +48,19 @@ type connCloser struct {
// Close calls the provided close function.
func (cc connCloser) Close() error { return cc.close() }
-// Options stores the configurations used in backchannel
-type Options struct {
- YamuxConfig *yamux.Config
+type options struct {
+ yamuxConfig *yamux.Config
}
// A Option sets options such as yamux configurations for backchannel
-type Option func(*Options)
+type Option func(*options)
-func defaultBackchannelOptions(logger io.Writer) *Options {
+// WithYamuxConfig customizes the yamux configuration used in backchannel
+func WithYamuxConfig(yamuxConfig *yamux.Config) Option {
+ return func(opts *options) { opts.yamuxConfig = yamuxConfig }
+}
+
+func defaultBackchannelOptions(logger io.Writer) *options {
yamuxConf := yamux.DefaultConfig()
// The server only accepts a single stream from the client, which is the client's gRPC stream.
// The backchannel server should only receive a single stream from the server. As such, we can
@@ -75,7 +79,7 @@ func defaultBackchannelOptions(logger io.Writer) *Options {
yamuxConf.LogOutput = logger
- return &Options{
- YamuxConfig: yamuxConf,
+ return &options{
+ yamuxConfig: yamuxConf,
}
}
diff --git a/internal/backchannel/client.go b/internal/backchannel/client.go
index cf635cd63..5af88cd0c 100644
--- a/internal/backchannel/client.go
+++ b/internal/backchannel/client.go
@@ -98,7 +98,7 @@ func (ch clientHandshake) serve(ctx context.Context, conn net.Conn) (net.Conn, e
}
// Initiate the multiplexing session.
- muxSession, err := yamux.Client(conn, options.YamuxConfig)
+ muxSession, err := yamux.Client(conn, options.yamuxConfig)
if err != nil {
logger.Close()
return nil, fmt.Errorf("open multiplexing session: %w", err)
diff --git a/internal/backchannel/server.go b/internal/backchannel/server.go
index c6a6a6c25..378b1a847 100644
--- a/internal/backchannel/server.go
+++ b/internal/backchannel/server.go
@@ -105,7 +105,7 @@ func (s *ServerHandshaker) Handshake(conn net.Conn, authInfo credentials.AuthInf
}
// Open the server side of the multiplexing session.
- muxSession, err := yamux.Server(conn, options.YamuxConfig)
+ muxSession, err := yamux.Server(conn, options.yamuxConfig)
if err != nil {
logger.Close()
return nil, nil, fmt.Errorf("create multiplexing session: %w", err)
diff --git a/internal/middleware/cancelhandler/cancelhandler.go b/internal/middleware/cancelhandler/cancelhandler.go
index 8ee870522..d8f5f4c53 100644
--- a/internal/middleware/cancelhandler/cancelhandler.go
+++ b/internal/middleware/cancelhandler/cancelhandler.go
@@ -2,7 +2,9 @@ package cancelhandler
import (
"context"
+ "errors"
+ "github.com/hashicorp/yamux"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -22,7 +24,24 @@ func Stream(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerIn
}
func wrapErr(ctx context.Context, err error) error {
- if err == nil || ctx.Err() == nil {
+ if err == nil {
+ return err
+ }
+
+ // The gRPC may not be aware that sidechannel clients hang up. Therefore,
+ // we have to handle yamux errors here. There are two errors for this scenario:
+ // - The client called Close(). A flagFIN is sent to the server.
+ // + The server starts force close timer, but is still able to write
+ // + The client starts a force close timer
+ // - When the timers are due, any read/write operations raise
+ // ErrStreamClosed. They send flagRST flag to the other side.
+ // - When either side receives flagRST, any read/write operations raise
+ // ErrConnectionReset
+ if errors.Is(err, yamux.ErrStreamClosed) || errors.Is(err, yamux.ErrConnectionReset) {
+ return status.Errorf(codes.Canceled, "%v", err)
+ }
+
+ if ctx.Err() == nil {
return err
}
diff --git a/internal/sidechannel/sidechannel.go b/internal/sidechannel/sidechannel.go
index d64407df1..56f517e0a 100644
--- a/internal/sidechannel/sidechannel.go
+++ b/internal/sidechannel/sidechannel.go
@@ -31,15 +31,11 @@ const (
sidechannelMetadataKey = "gitaly-sidechannel-id"
)
-// Options stores the configurations used in backchannel
-type Options struct {
- YamuxConfig *yamux.Config
+type options struct {
+ yamuxConfig *yamux.Config
}
-// A Option sets options such as yamux configurations for sidechannel
-type Option func(*Options)
-
-func defaultSidechannelOptions(logger io.Writer) *Options {
+func defaultSidechannelOptions(logger io.Writer) *options {
yamuxConf := yamux.DefaultConfig()
// At the moment, those configurations are the subset of backchannel yamux
@@ -49,11 +45,19 @@ func defaultSidechannelOptions(logger io.Writer) *Options {
yamuxConf.EnableKeepAlive = false
yamuxConf.LogOutput = logger
- return &Options{
- YamuxConfig: yamuxConf,
+ return &options{
+ yamuxConfig: yamuxConf,
}
}
+// A Option sets options such as yamux configurations for sidechannel
+type Option func(*options)
+
+// WithYamuxConfig customizes the yamux configuration used in sidechannel
+func WithYamuxConfig(yamuxConfig *yamux.Config) Option {
+ return func(opts *options) { opts.yamuxConfig = yamuxConfig }
+}
+
// OpenSidechannel opens a sidechannel connection from the stream opener
// extracted from the current peer connection.
func OpenSidechannel(ctx context.Context) (_ *ServerConn, err error) {
@@ -169,10 +173,6 @@ func NewClientHandshaker(logger *logrus.Entry, registry *Registry, opts ...Optio
lm.Register(NewServerHandshaker(registry))
return grpc.NewServer(grpc.Creds(lm))
},
- []backchannel.Option{
- func(options *backchannel.Options) {
- options.YamuxConfig = sidechannelOpts.YamuxConfig
- },
- }...,
+ backchannel.WithYamuxConfig(sidechannelOpts.yamuxConfig),
)
}
diff --git a/internal/sidechannel/sidechannel_test.go b/internal/sidechannel/sidechannel_test.go
index 034f62b2e..d1c80922f 100644
--- a/internal/sidechannel/sidechannel_test.go
+++ b/internal/sidechannel/sidechannel_test.go
@@ -148,7 +148,47 @@ func TestSidechannelConcurrency(t *testing.T) {
}
}
-func startServer(t *testing.T, th testHandler, opts ...grpc.ServerOption) string {
+func TestSidechannelCancelled(t *testing.T) {
+ addr := startServer(
+ t,
+ func(context context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
+ conn, err := OpenSidechannel(context)
+ if err != nil {
+ return nil, err
+ }
+ defer conn.Close()
+
+ if _, err := io.Copy(io.Discard, conn); err != nil {
+ return nil, err
+ }
+
+ responseData := make([]byte, 64*1024)
+ for {
+ // Write into yamux connection until reaching error.
+ if _, err = conn.Write(responseData); err != nil {
+ return nil, err
+ }
+ }
+ },
+ []grpc.ServerOption{
+ grpc.UnaryInterceptor(cancelhandler.Unary),
+ grpc.StreamInterceptor(cancelhandler.Stream),
+ },
+ withYamuxCfgFastTimeout(),
+ )
+
+ conn, registry := dial(t, addr, withYamuxCfgFastTimeout())
+ client := healthpb.NewHealthClient(conn)
+
+ ctxOut, waiter := RegisterSidechannel(context.Background(), registry, func(conn *ClientConn) error {
+ // Send data to the server but not wait for the response
+ return conn.CloseWrite()
+ })
+ defer waiter.Close()
+
+ _, err := client.Check(ctxOut, &healthpb.HealthCheckRequest{})
+ testhelper.RequireGrpcError(t, err, codes.Canceled)
+}
func startServer(t *testing.T, th testHandler, grpcOpts []grpc.ServerOption, sidechannelOpts ...Option) string {
t.Helper()
@@ -159,11 +199,7 @@ func startServer(t *testing.T, th testHandler, grpcOpts []grpc.ServerOption, sid
}
lm := listenmux.New(insecure.NewCredentials())
- lm.Register(backchannel.NewServerHandshaker(newLogger(), backchannel.NewRegistry(), nil, []backchannel.Option{
- func(backchannelOptions *backchannel.Options) {
- backchannelOptions.YamuxConfig = options.YamuxConfig
- },
- }...))
+ lm.Register(backchannel.NewServerHandshaker(newLogger(), backchannel.NewRegistry(), nil, backchannel.WithYamuxConfig(options.yamuxConfig)))
grpcOpts = append(grpcOpts, grpc.Creds(lm))
@@ -221,3 +257,9 @@ type server struct {
func (s *server) Check(context context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
return s.testHandler(context, request)
}
+
+func withYamuxCfgFastTimeout() Option {
+ return func(options *options) {
+ options.yamuxConfig.StreamCloseTimeout = 10 * time.Millisecond
+ }
+}