Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/grpc-proxy/proxy/handler.go')
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go17
1 files changed, 17 insertions, 0 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index aeac70181..743aedf8a 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -10,6 +10,7 @@ package proxy
import (
"context"
"errors"
+ "fmt"
"io"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
@@ -96,6 +97,20 @@ type streamAndDestination struct {
cancel func()
}
+// failDestinationWithErrors marks all of the destinations in the StreamParameters as
+// having failed with the given error.
+func failDestinationsWithError(params *StreamParameters, err error) {
+ if params.Primary().ErrHandler != nil {
+ _ = params.Primary().ErrHandler(err)
+ }
+
+ for _, secondary := range params.Secondaries() {
+ if secondary.ErrHandler != nil {
+ _ = secondary.ErrHandler(err)
+ }
+ }
+}
+
// handler is where the real magic of proxying happens.
// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
// forwarding it to a ClientStream established against the relevant ClientConn.
@@ -127,6 +142,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
primaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, params.Primary().Conn, fullMethodName, params.CallOptions()...)
if err != nil {
+ failDestinationsWithError(params, fmt.Errorf("initiate primary stream: %w", err))
return err
}
@@ -143,6 +159,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina
secondaryClientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, destination.Conn, fullMethodName, params.CallOptions()...)
if err != nil {
+ failDestinationsWithError(params, fmt.Errorf("initiate secondary stream: %w", err))
return err
}
secondaryStreams = append(secondaryStreams, streamAndDestination{