diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-16 17:51:40 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-22 17:56:58 +0300 |
commit | 7c10ea296e233dfb6bb7f12278abd874960646e6 (patch) | |
tree | bba78b0eda34b3ec14456b0ab29a592a6b4064db | |
parent | 771df64aaf511cc3c64d7b55aee2d961941bfdab (diff) |
proxy: Pass down destination to proxying code
The proxying code where the "real magic" happens currently only gets the
destination's peeked header message as well as the stream it's supposed
to proxy on. We'll need more information than this though in a later
commit, so let's refactor the code to instead pass down the complete
Destination structure.
No change in behaviour is expected from this commit.
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 73730b2c7..445ca84fb 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -90,10 +90,10 @@ type handler struct { director StreamDirector } -type streamAndMsg struct { +type streamAndDestination struct { grpc.ClientStream - msg []byte - cancel func() + destination Destination + cancel func() } // handler is where the real magic of proxying happens. @@ -130,24 +130,24 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina return err } - primaryStream := streamAndMsg{ + primaryStream := streamAndDestination{ ClientStream: primaryClientStream, - msg: params.Primary().Msg, + destination: params.Primary(), cancel: clientCancel, } - var secondaryStreams []streamAndMsg - for _, conn := range params.Secondaries() { - clientCtx, clientCancel := context.WithCancel(conn.Ctx) + var secondaryStreams []streamAndDestination + for _, destination := range params.Secondaries() { + clientCtx, clientCancel := context.WithCancel(destination.Ctx) defer clientCancel() - secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, conn.Conn, fullMethodName, params.CallOptions()...) + secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, destination.Conn, fullMethodName, params.CallOptions()...) if err != nil { return err } - secondaryStreams = append(secondaryStreams, streamAndMsg{ + secondaryStreams = append(secondaryStreams, streamAndDestination{ ClientStream: secondaryClientStream, - msg: conn.Msg, + destination: destination, cancel: clientCancel, }) } @@ -205,7 +205,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina return nil } -func cancelStreams(streams []streamAndMsg) { +func cancelStreams(streams []streamAndDestination) { for _, stream := range streams { stream.cancel() } @@ -213,7 +213,7 @@ func cancelStreams(streams []streamAndMsg) { // receiveSecondaryStreams reads from the client streams of the secondaries and drops the message // but returns an error to the channel if it encounters a non io.EOF error -func receiveSecondaryStreams(srcs []streamAndMsg) chan error { +func receiveSecondaryStreams(srcs []streamAndDestination) chan error { ret := make(chan error, 1) go func() { @@ -285,7 +285,7 @@ func forwardFramesToServer(dst grpc.ClientStream, frameChan <-chan *frame) error return dst.CloseSend() } -func forwardClientToServers(src grpc.ServerStream, dsts []streamAndMsg) chan error { +func forwardClientToServers(src grpc.ServerStream, dsts []streamAndDestination) chan error { ret := make(chan error, 1) go func() { var g errgroup.Group @@ -295,7 +295,7 @@ func forwardClientToServers(src grpc.ServerStream, dsts []streamAndMsg) chan err for _, dst := range dsts { dst := dst frameChan := make(chan *frame, 16) - frameChan <- &frame{payload: dst.msg} // send re-written message + frameChan <- &frame{payload: dst.destination.Msg} // send re-written message frameChans = append(frameChans, frameChan) g.Go(func() error { return forwardFramesToServer(dst, frameChan) }) |