diff options
author | John Cai <jcai@gitlab.com> | 2020-06-10 23:06:31 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-06-11 07:26:40 +0300 |
commit | 4e31ee320e00caae11d4d1bef590425be7228c86 (patch) | |
tree | eddbb9e47c79c922170864627438a7b6862ee4d3 | |
parent | 5f567adc2021a70dba29bb3cabd420784718c9a5 (diff) |
Refactor
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 117 |
1 files changed, 56 insertions, 61 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 4a41b5ca5..b9a25b863 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -9,6 +9,7 @@ package proxy import ( "context" + "errors" "io" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" @@ -152,24 +153,13 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error s2cErrChan := s.forwardServerToClients(serverStream, append(secondaryStreams, primaryStream)) c2sErrChan := s.forwardClientToServer(primaryClientStream, serverStream) secondaryErrChan := receiveSecondaryStreams(secondaryStreams) - // We don't know which side is going to stop sending first, so we need a select between the two. + + // We don't know whether the server, primary, or secondaries will stop sending first, so we need a select between them for i := 0; i < 2; i++ { select { case s2cErr := <-s2cErrChan: - if s2cErr == nil { - // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ - // the clientStream>serverStream may continue pumping though. - for _, stream := range append(secondaryStreams, primaryStream) { - stream.CloseSend() - } - - // wait for the writes to be proxied to the secondaries - secondaryErr := <-secondaryErrChan - if secondaryErr != nil { - return status.Errorf(codes.Internal, "failed proxying to secondary: %v", secondaryErr) - } - } else { - // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need + if s2cErr != nil { + // we may have gotten a receive error (stream disconnected, a read error etc) in which case we need // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and // exit with an error to the stack @@ -193,17 +183,21 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return c2sErr } + if secondaryErr := <-secondaryErrChan; secondaryErr != nil { + return status.Errorf(codes.Internal, "failed proxying to secondary: %v", secondaryErr) + } + return nil } } - return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") } // receiveSecondaryStreams reads from the client streams of the secondaries and drops the message // but returns an error to the channel if it encounters a non io.EOF error func receiveSecondaryStreams(srcs []streamAndMsg) chan error { - ret := make(chan error) + ret := make(chan error, 1) + go func() { var g errgroup.Group @@ -212,7 +206,7 @@ func receiveSecondaryStreams(srcs []streamAndMsg) chan error { g.Go(func() error { for { if err := src.RecvMsg(&frame{}); err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { return nil } @@ -223,8 +217,7 @@ func receiveSecondaryStreams(srcs []streamAndMsg) chan error { }) } - err := g.Wait() - ret <- err + ret <- g.Wait() }() return ret } @@ -261,65 +254,67 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt return ret } +func forwardConsumedToClient(dst grpc.ClientStream, frameChan <-chan *frame) error { + for f := range frameChan { + if err := dst.SendMsg(f); err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + } + + // all messages redirected + return dst.CloseSend() +} + func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) chan error { ret := make(chan error, 1) go func() { var g errgroup.Group - for _, dst := range dsts { - dst := dst // rescoping for goroutine - g.Go(func() error { - return dst.SendMsg(&frame{payload: dst.msg}) - }) - } - - if err := g.Wait(); err != nil { - ret <- err - } - // resume two-way stream after peeked messages - frameChans := make([]chan *frame, 0, len(dsts)) + frameChans := make([]chan<- *frame, 0, len(dsts)) for _, dst := range dsts { dst := dst - frameChan := make(chan *frame) - + frameChan := make(chan *frame, 16) + frameChan <- &frame{payload: dst.msg} // send re-written message frameChans = append(frameChans, frameChan) - g.Go(func() error { - for { - f := <-frameChan - if f == nil { - return nil - } - if err := dst.SendMsg(f); err != nil { - return err - } - } - }) + g.Go(func() error { return forwardConsumedToClient(dst, frameChan) }) } - go func() { - for { - f := &frame{} - - if err := src.RecvMsg(f); err != nil { - for _, frameChan := range frameChans { - frameChan <- nil - } - if err != io.EOF { - ret <- err - } - + for { + if err := consumeServerAndForward(src, frameChans); err != nil { + if errors.Is(err, io.EOF) { break } - for _, frameChan := range frameChans { - frameChan <- f - } + ret <- err + return } - }() + } ret <- g.Wait() }() return ret } + +func consumeServerAndForward(src grpc.ServerStream, frameChans []chan<- *frame) error { + f := &frame{} + + if err := src.RecvMsg(f); err != nil { + for _, frameChan := range frameChans { + // signal no more data to redirect + close(frameChan) + } + + return err // this can be io.EOF which is happy case + } + + for _, frameChan := range frameChans { + frameChan <- f + } + + return nil +} |