Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-16 17:51:40 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2021-02-22 17:56:58 +0300
commit7c10ea296e233dfb6bb7f12278abd874960646e6 (patch)
treebba78b0eda34b3ec14456b0ab29a592a6b4064db
parent771df64aaf511cc3c64d7b55aee2d961941bfdab (diff)
proxy: Pass down destination to proxying code
The proxying code where the "real magic" happens currently only gets the destination's peeked header message as well as the stream it's supposed to proxy on. We'll need more information than this though in a later commit, so let's refactor the code to instead pass down the complete Destination structure. No change in behaviour is expected from this commit.
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go30
1 files changed, 15 insertions, 15 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index 73730b2c7..445ca84fb 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -90,10 +90,10 @@ type handler struct {
director StreamDirector
}
-type streamAndMsg struct {
+type streamAndDestination struct {
grpc.ClientStream
- msg []byte
- cancel func()
+ destination Destination
+ cancel func()
}
// handler is where the real magic of proxying happens.
@@ -130,24 +130,24 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
return err
}
- primaryStream := streamAndMsg{
+ primaryStream := streamAndDestination{
ClientStream: primaryClientStream,
- msg: params.Primary().Msg,
+ destination: params.Primary(),
cancel: clientCancel,
}
- var secondaryStreams []streamAndMsg
- for _, conn := range params.Secondaries() {
- clientCtx, clientCancel := context.WithCancel(conn.Ctx)
+ var secondaryStreams []streamAndDestination
+ for _, destination := range params.Secondaries() {
+ clientCtx, clientCancel := context.WithCancel(destination.Ctx)
defer clientCancel()
- secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, conn.Conn, fullMethodName, params.CallOptions()...)
+ secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, destination.Conn, fullMethodName, params.CallOptions()...)
if err != nil {
return err
}
- secondaryStreams = append(secondaryStreams, streamAndMsg{
+ secondaryStreams = append(secondaryStreams, streamAndDestination{
ClientStream: secondaryClientStream,
- msg: conn.Msg,
+ destination: destination,
cancel: clientCancel,
})
}
@@ -205,7 +205,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
return nil
}
-func cancelStreams(streams []streamAndMsg) {
+func cancelStreams(streams []streamAndDestination) {
for _, stream := range streams {
stream.cancel()
}
@@ -213,7 +213,7 @@ func cancelStreams(streams []streamAndMsg) {
// receiveSecondaryStreams reads from the client streams of the secondaries and drops the message
// but returns an error to the channel if it encounters a non io.EOF error
-func receiveSecondaryStreams(srcs []streamAndMsg) chan error {
+func receiveSecondaryStreams(srcs []streamAndDestination) chan error {
ret := make(chan error, 1)
go func() {
@@ -285,7 +285,7 @@ func forwardFramesToServer(dst grpc.ClientStream, frameChan <-chan *frame) error
return dst.CloseSend()
}
-func forwardClientToServers(src grpc.ServerStream, dsts []streamAndMsg) chan error {
+func forwardClientToServers(src grpc.ServerStream, dsts []streamAndDestination) chan error {
ret := make(chan error, 1)
go func() {
var g errgroup.Group
@@ -295,7 +295,7 @@ func forwardClientToServers(src grpc.ServerStream, dsts []streamAndMsg) chan err
for _, dst := range dsts {
dst := dst
frameChan := make(chan *frame, 16)
- frameChan <- &frame{payload: dst.msg} // send re-written message
+ frameChan <- &frame{payload: dst.destination.Msg} // send re-written message
frameChans = append(frameChans, frameChan)
g.Go(func() error { return forwardFramesToServer(dst, frameChan) })