diff options
Diffstat (limited to 'internal/praefect/grpc-proxy/proxy/handler.go')
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index aeac70181..743aedf8a 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -10,6 +10,7 @@ package proxy import ( "context" "errors" + "fmt" "io" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" @@ -96,6 +97,20 @@ type streamAndDestination struct { cancel func() } +// failDestinationWithErrors marks all of the destinations in the StreamParameters as +// having failed with the given error. +func failDestinationsWithError(params *StreamParameters, err error) { + if params.Primary().ErrHandler != nil { + _ = params.Primary().ErrHandler(err) + } + + for _, secondary := range params.Secondaries() { + if secondary.ErrHandler != nil { + _ = secondary.ErrHandler(err) + } + } +} + // handler is where the real magic of proxying happens. // It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire, // forwarding it to a ClientStream established against the relevant ClientConn. @@ -127,6 +142,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina primaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Primary().Conn, fullMethodName, params.CallOptions()...) if err != nil { + failDestinationsWithError(params, fmt.Errorf("initiate primary stream: %w", err)) return err } @@ -143,6 +159,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, destination.Conn, fullMethodName, params.CallOptions()...) if err != nil { + failDestinationsWithError(params, fmt.Errorf("initiate secondary stream: %w", err)) return err } secondaryStreams = append(secondaryStreams, streamAndDestination{ |