diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-19 10:15:34 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2021-02-23 12:01:52 +0300 |
commit | 5a18c4cb74519342307dfabed5e35b77bf8bc5c7 (patch) | |
tree | 5f40467b65c1be84e9446298f08bb53029566d4e | |
parent | 8167e4d4e2dab0357baf025ef84cef8200e730bf (diff) |
proxy: Synchronize cancellation of proxied streams
When proxying streams to both primaries and secondaries, then we cancel
all streams as soon as either the client or the primary stream has
caused an error. Because streams are handled in Goroutines, this
cancellation happens asynchronously. We ignore this fact in the proxying
handler and simply return, which in effect means that streams may still
get proxied after the handler returns.
Naturally, this can cause races. Most importantly, it can happen that
the context is still getting accessed even though it's already about to
be wrapped up. One common race observed was that the primary stream was
calling `sentryhandler.MarkToSkip()` while secondaries were still being
proxied and to log a message via `ctxlogrus`. Given that `MarkToSkip()`
modifies the context while `ctxlogrus` wants to access its values, this
is a data race and thus causes a runtime panic.
Fix this race by synchronizing stream cancellation: we do not return
early and also do not modify the context before all streams have been
cancelled.
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 59 |
1 files changed, 35 insertions, 24 deletions
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index ab1cab7bf..ee5cbd7c8 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -161,47 +161,58 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) (fina p2cErrChan := forwardPrimaryToClient(primaryStream, serverStream) secondaryErrChan := receiveSecondaryStreams(secondaryStreams) - // We need to wait for the streams from the primary and secondaries. However, we don't need to wait for the s2c stream to finish because - // in some cases the client will close the stream, signaling the end of a request/response cycle. If we wait for s2cErrChan, we are effectively - // forcing the client to close the stream. + // We need to wait for the streams from the primary and secondaries. However, we don't need + // to wait for the p2c stream to finish because in some cases the client will close the + // stream, signaling the end of a request/response cycle. If we wait for p2cErrChan, we are + // effectively forcing the client to close the stream. var primaryProxied, secondariesProxied bool + var clientErr, secondaryErr, primaryErr error + for !(primaryProxied && secondariesProxied) { select { - case c2sErr := <-c2sErrChan: - if c2sErr != nil { - // we may have gotten a receive error (stream disconnected, a read error etc) in which case we need - // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and - // exit with an error to the stack + case clientErr = <-c2sErrChan: + if clientErr != nil { + // We may have gotten a receive error (stream disconnected, a read + // error etc) in which case we need to cancel all streams. In order + // to not exit before all Goroutines have concluded, we do not + // return directly but instead return the error after the loop. cancelStreams(allStreams) - return status.Errorf(codes.Internal, "failed proxying c2s: %v", c2sErr) - } - case secondaryErr := <-secondaryErrChan: - if secondaryErr != nil { - return status.Errorf(codes.Internal, "failed proxying to secondary: %v", secondaryErr) } + case secondaryErr = <-secondaryErrChan: secondariesProxied = true - case p2cErr := <-p2cErrChan: - // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two - // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers + case primaryErr = <-p2cErrChan: + // This happens when the clientStream has nothing else to offer (io.EOF) or + // returned a gRPC error. In those two cases we may have received Trailers + // as part of the call. In case of other errors (stream closed) the trailers // will be nil. trailer := primaryClientStream.Trailer() serverStream.SetTrailer(trailer) - // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. - if p2cErr != io.EOF { + + // clientErr will contain RPC error from client code. If not io.EOF, return + // the RPC error as server stream error after the loop. + if primaryErr != io.EOF { // If there is an error from the primary, we want to cancel all streams cancelStreams(allStreams) - - if trailer != nil { - // we must not propagate Gitaly errors into Sentry - sentryhandler.MarkToSkip(serverStream.Context()) - } - return p2cErr } primaryProxied = true } } + if clientErr != nil { + return status.Errorf(codes.Internal, "failed proxying c2s: %v", clientErr) + } + + if primaryErr != nil && primaryErr != io.EOF { + // we must not propagate Gitaly errors into Sentry + sentryhandler.MarkToSkip(serverStream.Context()) + return primaryErr + } + + if secondaryErr != nil { + return status.Errorf(codes.Internal, "failed proxying to secondary: %v", secondaryErr) + } + return nil } |