diff options
author | John Cai <jcai@gitlab.com> | 2020-06-04 22:12:18 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-06-05 00:30:41 +0300 |
commit | 274024f910dab28ee09a675d71c83955c9facb0d (patch) | |
tree | 2cd9b4ca06b9b09011192171dcd923063ec2c83d | |
parent | 6604f94d8da44699365302e7c7c8c0c1cc30ccb6 (diff) |
long running goroutinesjc-mul-node-write-long-running
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 77 |
1 files changed, 47 insertions, 30 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 235806b8e..a26bf0651 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -145,26 +145,24 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ - s2cErrChan := s.forwardServerToClients(serverStream, allClientStreams) + s2cErrChan, s2cDoneChan := s.forwardServerToClients(serverStream, allClientStreams) c2sErrChan := s.forwardClientToServer(primaryClientStream, serverStream) // We don't know which side is going to stop sending first, so we need a select between the two. for i := 0; i < 2; i++ { select { - case s2cErr := <-s2cErrChan: - if s2cErr == io.EOF { - // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ - // the clientStream>serverStream may continue pumping though. - for _, clientStream := range allClientStreams { - clientStream.CloseSend() - } - } else { - // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need - // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and - // exit with an error to the stack - clientCancel() - return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + case <-s2cDoneChan: + // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ + // the clientStream>serverStream may continue pumping though. + for _, clientStream := range allClientStreams { + clientStream.CloseSend() } + case s2cErr := <-s2cErrChan: + // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need + // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and + // exit with an error to the stack + clientCancel() + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) case c2sErr := <-c2sErrChan: // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers @@ -217,8 +215,10 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt return ret } -func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) chan error { +func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) (chan error, chan struct{}) { ret := make(chan error, len(dsts)) + done := make(chan struct{}) + go func() { var g errgroup.Group for _, dst := range dsts { @@ -239,29 +239,46 @@ func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAnd ret <- err } + var frameChans []chan *frame + for _, dst := range dsts { + dst := dst + frameChan := make(chan *frame) + frameChans = append(frameChans, frameChan) + g.Go(func() error { + for { + f := <-frameChan + if f == nil { + return nil + } + if err := dst.SendMsg(f); err != nil { + return err + } + } + }) + } // resume two-way stream after peeked messages f := &frame{} for i := 0; ; i++ { if err := src.RecvMsg(f); err != nil { - ret <- err // this can be io.EOF which is happy case + for _, frameChan := range frameChans { + close(frameChan) + } + if err != io.EOF { + ret <- err // this can be io.EOF which is happy case + } break } - var g errgroup.Group - for _, dst := range dsts { - dst := dst - f := f - g.Go(func() error { - if err := dst.SendMsg(f); err != nil { - return err - } - return nil - }) - } - if err := g.Wait(); err != nil { - ret <- err + for _, frameChan := range frameChans { + frameChan <- f } } + + if err := g.Wait(); err != nil { + ret <- err + } + + done <- struct{}{} }() - return ret + return ret, done } |