Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-06-04 22:12:18 +0300
committerJohn Cai <jcai@gitlab.com>2020-06-05 00:30:41 +0300
commit274024f910dab28ee09a675d71c83955c9facb0d (patch)
tree2cd9b4ca06b9b09011192171dcd923063ec2c83d
parent6604f94d8da44699365302e7c7c8c0c1cc30ccb6 (diff)
long running goroutinesjc-mul-node-write-long-running
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go77
1 files changed, 47 insertions, 30 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index 235806b8e..a26bf0651 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -145,26 +145,24 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
- s2cErrChan := s.forwardServerToClients(serverStream, allClientStreams)
+ s2cErrChan, s2cDoneChan := s.forwardServerToClients(serverStream, allClientStreams)
c2sErrChan := s.forwardClientToServer(primaryClientStream, serverStream)
// We don't know which side is going to stop sending first, so we need a select between the two.
for i := 0; i < 2; i++ {
select {
- case s2cErr := <-s2cErrChan:
- if s2cErr == io.EOF {
- // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
- // the clientStream>serverStream may continue pumping though.
- for _, clientStream := range allClientStreams {
- clientStream.CloseSend()
- }
- } else {
- // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
- // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
- // exit with an error to the stack
- clientCancel()
- return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
+ case <-s2cDoneChan:
+ // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
+ // the clientStream>serverStream may continue pumping though.
+ for _, clientStream := range allClientStreams {
+ clientStream.CloseSend()
}
+ case s2cErr := <-s2cErrChan:
+ // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
+ // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
+ // exit with an error to the stack
+ clientCancel()
+ return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
@@ -217,8 +215,10 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt
return ret
}
-func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) chan error {
+func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) (chan error, chan struct{}) {
ret := make(chan error, len(dsts))
+ done := make(chan struct{})
+
go func() {
var g errgroup.Group
for _, dst := range dsts {
@@ -239,29 +239,46 @@ func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAnd
ret <- err
}
+ var frameChans []chan *frame
+ for _, dst := range dsts {
+ dst := dst
+ frameChan := make(chan *frame)
+ frameChans = append(frameChans, frameChan)
+ g.Go(func() error {
+ for {
+ f := <-frameChan
+ if f == nil {
+ return nil
+ }
+ if err := dst.SendMsg(f); err != nil {
+ return err
+ }
+ }
+ })
+ }
// resume two-way stream after peeked messages
f := &frame{}
for i := 0; ; i++ {
if err := src.RecvMsg(f); err != nil {
- ret <- err // this can be io.EOF which is happy case
+ for _, frameChan := range frameChans {
+ close(frameChan)
+ }
+ if err != io.EOF {
+ ret <- err // this can be io.EOF which is happy case
+ }
break
}
- var g errgroup.Group
- for _, dst := range dsts {
- dst := dst
- f := f
- g.Go(func() error {
- if err := dst.SendMsg(f); err != nil {
- return err
- }
- return nil
- })
- }
- if err := g.Wait(); err != nil {
- ret <- err
+ for _, frameChan := range frameChans {
+ frameChan <- f
}
}
+
+ if err := g.Wait(); err != nil {
+ ret <- err
+ }
+
+ done <- struct{}{}
}()
- return ret
+ return ret, done
}