Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-07-20 09:42:19 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-07-22 14:38:39 +0300
commit3d85f0b06a5aaad736bc96db77689be6b121a6d4 (patch)
tree7a61eef9ccc57a20986f8c18d6bb19fd89d3d5e3
parentdc2794ab86c15aec27c584745f4ce3b41da30393 (diff)
ssh: Add tests with injected network errors
In gitlab.com there has been a recent uptick in incidents caused by SSHUploadPackWithSidechannel failing with the following error message: fatal: the remote end hung up unexpectedly While we have been reasonably sure that this error really only happens in case the client has cancelled the fetch, we couldn't really tell whether any other conditions might trigger the same error. Most notably, we were not sure if this might've indicated network connectivity issues. Add tests to exercise this RPC call with injected network errors. As it turns out, none of the injected errors cause the error to appear, which is a good thing.
-rw-r--r--internal/gitaly/service/ssh/upload_pack_test.go143
1 files changed, 143 insertions, 0 deletions
diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go
index e053ad83c..a00424147 100644
--- a/internal/gitaly/service/ssh/upload_pack_test.go
+++ b/internal/gitaly/service/ssh/upload_pack_test.go
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
+ "net"
"os"
"os/exec"
"path/filepath"
@@ -32,6 +33,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/encoding/protojson"
)
@@ -401,6 +403,147 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
}
}
+// faultInjectingConn is a connection that can inject a Read error when seeing a specific token.
+type faultInjectingConn struct {
+ net.Conn
+ buf bytes.Buffer
+ listener faultInjectingListener
+}
+
+// Read reads from the underlying network connection. If the read data contains the expected token
+// configured by the listener then it returns the injected error instead. Otherwise it behaves the
+// same as the underlying `Read()` function.
+func (c *faultInjectingConn) Read(buf []byte) (int, error) {
+ n, err := c.Conn.Read(buf[:1])
+ if err != nil {
+ return n, err
+ }
+
+ // We cache everything we've written in our internal buffer. This is done to have a
+ // look-behind buffer: we're reading the stream byte-by-byte in order to ensure that we cut
+ // off the stream exactly when we want to. We need to be able to observe our token though at
+ // which we want to cut off, so we cache the chit-chat.
+ if _, err := c.buf.Write(buf[:n]); err != nil {
+ return n, err
+ }
+
+ if bytes.HasSuffix(c.buf.Bytes(), []byte(c.listener.injectErrAt)) {
+ return n, c.listener.injectErr
+ }
+
+ return n, nil
+}
+
+// faultInjectingListener is a listener that creates connections into which read errors can be
+// injected.
+type faultInjectingListener struct {
+ *bufconn.Listener
+ injectErrAt string
+ injectErr error
+}
+
+func (l faultInjectingListener) Accept() (net.Conn, error) {
+ conn, err := l.Listener.Accept()
+ if err != nil {
+ return nil, err
+ }
+
+ return &faultInjectingConn{
+ Conn: conn,
+ listener: l,
+ }, nil
+}
+
+func TestUploadPackWithSidechannel_network(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ cfg := testcfg.Build(t)
+ server := startSSHServerWithOptions(t, cfg, nil)
+
+ repo, repoPath := gittest.InitRepo(t, cfg, cfg.Storages[0])
+ commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithParents())
+
+ registry := sidechannel.NewRegistry()
+ clientHandshaker := sidechannel.NewClientHandshaker(testhelper.NewDiscardingLogEntry(t), registry)
+
+ for _, tc := range []struct {
+ desc string
+ injectErrAt string
+ injectErr error
+ expectedErr error
+ expectedResponse *gitalypb.SSHUploadPackWithSidechannelResponse
+ }{
+ {
+ desc: "successful clone",
+ expectedResponse: &gitalypb.SSHUploadPackWithSidechannelResponse{},
+ },
+ {
+ desc: "ErrClosed on capability announcement",
+ injectErrAt: "multi_ack",
+ injectErr: net.ErrClosed,
+ expectedErr: helper.ErrUnavailablef("error reading from server: EOF"),
+ },
+ {
+ desc: "ErrUnexpectedEOF on capability announcement",
+ injectErrAt: "multi_ack",
+ injectErr: io.ErrUnexpectedEOF,
+ expectedErr: helper.ErrUnavailablef("error reading from server: EOF"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ // Set up the fault injecting listener so that we can inject network errors
+ // into the underlying network connection.
+ listener := faultInjectingListener{
+ // We create the listener with a buffer of `1` so that we only ever
+ // read a single byte at a time. This guarantees that we can indeed
+ // cut off the stream exactly when we want to.
+ Listener: bufconn.Listen(1),
+ injectErrAt: tc.injectErrAt,
+ injectErr: tc.injectErr,
+ }
+ go func() {
+ require.NoError(t, server.Server.Serve(listener))
+ }()
+
+ // Dial the service, but replace the default dialer with our own listener.
+ conn, err := grpc.Dial("bufconn",
+ grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())),
+ grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+ return listener.DialContext(ctx)
+ }),
+ )
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, conn)
+
+ // We set up the sidechannel so that it performs a complete packfile
+ // negotiation that succeeds if no error is injected.
+ ctx, waiter := sidechannel.RegisterSidechannel(ctx, registry, func(clientConn *sidechannel.ClientConn) error {
+ var buf bytes.Buffer
+ gittest.WritePktlineString(t, &buf, "want "+commitID.String()+" multi_ack\n")
+ gittest.WritePktlineFlush(t, &buf)
+ gittest.WritePktlineString(t, &buf, "done")
+
+ // Copy from our side to the client and the other way round. We
+ // ignore any errors returned by this function because we're
+ // explicitly testing the case where network I/O is failing.
+ _, _ = io.Copy(clientConn, &buf)
+ _, _ = io.Copy(io.Discard, clientConn)
+
+ return nil
+ })
+ defer testhelper.MustClose(t, waiter)
+
+ client := gitalypb.NewSSHServiceClient(conn)
+ response, err := client.SSHUploadPackWithSidechannel(ctx, &gitalypb.SSHUploadPackWithSidechannelRequest{
+ Repository: repo,
+ })
+ testhelper.RequireGrpcError(t, tc.expectedErr, err)
+ testhelper.ProtoEqual(t, tc.expectedResponse, response)
+ })
+ }
+}
+
func requireFailedSSHStream(t *testing.T, recv func() (int32, error)) {
done := make(chan struct{})
var code int32