Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-06-10 23:06:31 +0300
committerJohn Cai <jcai@gitlab.com>2020-06-11 07:26:40 +0300
commit4e31ee320e00caae11d4d1bef590425be7228c86 (patch)
treeeddbb9e47c79c922170864627438a7b6862ee4d3
parent5f567adc2021a70dba29bb3cabd420784718c9a5 (diff)
Refactor
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go117
1 files changed, 56 insertions, 61 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index 4a41b5ca5..b9a25b863 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -9,6 +9,7 @@ package proxy
import (
"context"
+ "errors"
"io"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
@@ -152,24 +153,13 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
s2cErrChan := s.forwardServerToClients(serverStream, append(secondaryStreams, primaryStream))
c2sErrChan := s.forwardClientToServer(primaryClientStream, serverStream)
secondaryErrChan := receiveSecondaryStreams(secondaryStreams)
- // We don't know which side is going to stop sending first, so we need a select between the two.
+
+ // We don't know whether the server, primary, or secondaries will stop sending first, so we need a select between them
for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
- if s2cErr == nil {
- // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
- // the clientStream>serverStream may continue pumping though.
- for _, stream := range append(secondaryStreams, primaryStream) {
- stream.CloseSend()
- }
-
- // wait for the writes to be proxied to the secondaries
- secondaryErr := <-secondaryErrChan
- if secondaryErr != nil {
- return status.Errorf(codes.Internal, "failed proxying to secondary: %v", secondaryErr)
- }
- } else {
- // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
+ if s2cErr != nil {
+ // we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
@@ -193,17 +183,21 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return c2sErr
}
+ if secondaryErr := <-secondaryErrChan; secondaryErr != nil {
+ return status.Errorf(codes.Internal, "failed proxying to secondary: %v", secondaryErr)
+ }
+
return nil
}
}
-
return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
// receiveSecondaryStreams reads from the client streams of the secondaries and drops the message
// but returns an error to the channel if it encounters a non io.EOF error
func receiveSecondaryStreams(srcs []streamAndMsg) chan error {
- ret := make(chan error)
+ ret := make(chan error, 1)
+
go func() {
var g errgroup.Group
@@ -212,7 +206,7 @@ func receiveSecondaryStreams(srcs []streamAndMsg) chan error {
g.Go(func() error {
for {
if err := src.RecvMsg(&frame{}); err != nil {
- if err == io.EOF {
+ if errors.Is(err, io.EOF) {
return nil
}
@@ -223,8 +217,7 @@ func receiveSecondaryStreams(srcs []streamAndMsg) chan error {
})
}
- err := g.Wait()
- ret <- err
+ ret <- g.Wait()
}()
return ret
}
@@ -261,65 +254,67 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt
return ret
}
+func forwardConsumedToClient(dst grpc.ClientStream, frameChan <-chan *frame) error {
+ for f := range frameChan {
+ if err := dst.SendMsg(f); err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ return err
+ }
+ }
+
+ // all messages redirected
+ return dst.CloseSend()
+}
+
func (s *handler) forwardServerToClients(src grpc.ServerStream, dsts []streamAndMsg) chan error {
ret := make(chan error, 1)
go func() {
var g errgroup.Group
- for _, dst := range dsts {
- dst := dst // rescoping for goroutine
- g.Go(func() error {
- return dst.SendMsg(&frame{payload: dst.msg})
- })
- }
-
- if err := g.Wait(); err != nil {
- ret <- err
- }
- // resume two-way stream after peeked messages
- frameChans := make([]chan *frame, 0, len(dsts))
+ frameChans := make([]chan<- *frame, 0, len(dsts))
for _, dst := range dsts {
dst := dst
- frameChan := make(chan *frame)
-
+ frameChan := make(chan *frame, 16)
+ frameChan <- &frame{payload: dst.msg} // send re-written message
frameChans = append(frameChans, frameChan)
- g.Go(func() error {
- for {
- f := <-frameChan
- if f == nil {
- return nil
- }
- if err := dst.SendMsg(f); err != nil {
- return err
- }
- }
- })
+ g.Go(func() error { return forwardConsumedToClient(dst, frameChan) })
}
- go func() {
- for {
- f := &frame{}
-
- if err := src.RecvMsg(f); err != nil {
- for _, frameChan := range frameChans {
- frameChan <- nil
- }
- if err != io.EOF {
- ret <- err
- }
-
+ for {
+ if err := consumeServerAndForward(src, frameChans); err != nil {
+ if errors.Is(err, io.EOF) {
break
}
- for _, frameChan := range frameChans {
- frameChan <- f
- }
+ ret <- err
+ return
}
- }()
+ }
ret <- g.Wait()
}()
return ret
}
+
+func consumeServerAndForward(src grpc.ServerStream, frameChans []chan<- *frame) error {
+ f := &frame{}
+
+ if err := src.RecvMsg(f); err != nil {
+ for _, frameChan := range frameChans {
+ // signal no more data to redirect
+ close(frameChan)
+ }
+
+ return err // this can be io.EOF which is happy case
+ }
+
+ for _, frameChan := range frameChans {
+ frameChan <- f
+ }
+
+ return nil
+}