Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-07-20 09:42:19 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-07-21 13:45:31 +0300
commit86737530a628266a76440a34969a333a7f35c42b (patch)
treeb3602f3bdb9d994cb69c7c5e44592d7f7c8840bc
parentfdabd999b7dcc7142c1f03a334f87f74c51e64ee (diff)
ssh: Add tests with injected network errors
In gitlab.com there has been a recent uptick in incidents caused by SSHUploadPackWithSidechannel failing with the following error message: fatal: the remote end hung up unexpectedly While we have been reasonably sure that this error really only happens in case the client has cancelled the fetch, we couldn't really tell whether any other conditions might trigger the same error. Most notably, we were not sure if this might've indicated network connectivity issues. Add tests to exercise this RPC call with injected network errors. As it turns out, none of the injected errors cause the error to appear, which is a good thing.
-rw-r--r--internal/gitaly/service/ssh/upload_pack_test.go143
1 files changed, 143 insertions, 0 deletions
diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go
index e053ad83c..a00424147 100644
--- a/internal/gitaly/service/ssh/upload_pack_test.go
+++ b/internal/gitaly/service/ssh/upload_pack_test.go
@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
+ "net"
"os"
"os/exec"
"path/filepath"
@@ -32,6 +33,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/encoding/protojson"
)
@@ -401,6 +403,147 @@ func TestUploadPackWithSidechannel_client(t *testing.T) {
}
}
+// faultInjectingConn is a connection that can inject a Read error when seeing a specific token.
+type faultInjectingConn struct {
+ net.Conn
+ buf bytes.Buffer
+ listener faultInjectingListener
+}
+
+// Read reads from the underlying network connection. If the read data contains the expected token
+// configured by the listener then it returns the injected error instead. Otherwise it behaves the
+// same as the underlying `Read()` function.
+func (c *faultInjectingConn) Read(buf []byte) (int, error) {
+ n, err := c.Conn.Read(buf[:1])
+ if err != nil {
+ return n, err
+ }
+
+ // We cache everything we've written in our internal buffer. This is done to have a
+ // look-behind buffer: we're reading the stream byte-by-byte in order to ensure that we cut
+ // off the stream exactly when we want to. We need to be able to observe our token though at
+ // which we want to cut off, so we cache the chit-chat.
+ if _, err := c.buf.Write(buf[:n]); err != nil {
+ return n, err
+ }
+
+ if bytes.HasSuffix(c.buf.Bytes(), []byte(c.listener.injectErrAt)) {
+ return n, c.listener.injectErr
+ }
+
+ return n, nil
+}
+
+// faultInjectingListener is a listener that creates connections into which read errors can be
+// injected.
+type faultInjectingListener struct {
+ *bufconn.Listener
+ injectErrAt string
+ injectErr error
+}
+
+func (l faultInjectingListener) Accept() (net.Conn, error) {
+ conn, err := l.Listener.Accept()
+ if err != nil {
+ return nil, err
+ }
+
+ return &faultInjectingConn{
+ Conn: conn,
+ listener: l,
+ }, nil
+}
+
+func TestUploadPackWithSidechannel_network(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ cfg := testcfg.Build(t)
+ server := startSSHServerWithOptions(t, cfg, nil)
+
+ repo, repoPath := gittest.InitRepo(t, cfg, cfg.Storages[0])
+ commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithParents())
+
+ registry := sidechannel.NewRegistry()
+ clientHandshaker := sidechannel.NewClientHandshaker(testhelper.NewDiscardingLogEntry(t), registry)
+
+ for _, tc := range []struct {
+ desc string
+ injectErrAt string
+ injectErr error
+ expectedErr error
+ expectedResponse *gitalypb.SSHUploadPackWithSidechannelResponse
+ }{
+ {
+ desc: "successful clone",
+ expectedResponse: &gitalypb.SSHUploadPackWithSidechannelResponse{},
+ },
+ {
+ desc: "ErrClosed on capability announcement",
+ injectErrAt: "multi_ack",
+ injectErr: net.ErrClosed,
+ expectedErr: helper.ErrUnavailablef("error reading from server: EOF"),
+ },
+ {
+ desc: "ErrUnexpectedEOF on capability announcement",
+ injectErrAt: "multi_ack",
+ injectErr: io.ErrUnexpectedEOF,
+ expectedErr: helper.ErrUnavailablef("error reading from server: EOF"),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ // Set up the fault injecting listener so that we can inject network errors
+ // into the underlying network connection.
+ listener := faultInjectingListener{
+ // We create the listener with a buffer of `1` so that we only ever
+ // read a single byte at a time. This guarantees that we can indeed
+ // cut off the stream exactly when we want to.
+ Listener: bufconn.Listen(1),
+ injectErrAt: tc.injectErrAt,
+ injectErr: tc.injectErr,
+ }
+ go func() {
+ require.NoError(t, server.Server.Serve(listener))
+ }()
+
+ // Dial the service, but replace the default dialer with our own listener.
+ conn, err := grpc.Dial("bufconn",
+ grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials())),
+ grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
+ return listener.DialContext(ctx)
+ }),
+ )
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, conn)
+
+ // We set up the sidechannel so that it performs a complete packfile
+ // negotiation that succeeds if no error is injected.
+ ctx, waiter := sidechannel.RegisterSidechannel(ctx, registry, func(clientConn *sidechannel.ClientConn) error {
+ var buf bytes.Buffer
+ gittest.WritePktlineString(t, &buf, "want "+commitID.String()+" multi_ack\n")
+ gittest.WritePktlineFlush(t, &buf)
+ gittest.WritePktlineString(t, &buf, "done")
+
+ // Copy from our side to the client and the other way round. We
+ // ignore any errors returned by this function because we're
+ // explicitly testing the case where network I/O is failing.
+ _, _ = io.Copy(clientConn, &buf)
+ _, _ = io.Copy(io.Discard, clientConn)
+
+ return nil
+ })
+ defer testhelper.MustClose(t, waiter)
+
+ client := gitalypb.NewSSHServiceClient(conn)
+ response, err := client.SSHUploadPackWithSidechannel(ctx, &gitalypb.SSHUploadPackWithSidechannelRequest{
+ Repository: repo,
+ })
+ testhelper.RequireGrpcError(t, tc.expectedErr, err)
+ testhelper.ProtoEqual(t, tc.expectedResponse, response)
+ })
+ }
+}
+
func requireFailedSSHStream(t *testing.T, recv func() (int32, error)) {
done := make(chan struct{})
var code int32