Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-01-18 12:20:36 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-01-18 12:20:36 +0300
commitb5722938b3e2a92a46f050b79864d745abcc4ecc (patch)
treeab053ba3523dc5b9e97c8036c8cb3b0801d93313
parent870d04c22168540019a21904d529721db4d1c153 (diff)
parent3a9a94065b85801e5a1a9b4d0293c5ae90644a63 (diff)
Merge branch 'jv-ssh-sidechannel-2' into 'master'
Add SSHUploadPackWithSidechannel Closes gitlab-com/gl-infra/scalability#1513 See merge request gitlab-org/gitaly!4251
-rw-r--r--client/upload_pack.go23
-rw-r--r--cmd/gitaly-hooks/hooks.go30
-rw-r--r--cmd/gitaly-ssh/main.go25
-rw-r--r--cmd/gitaly-ssh/main_test.go13
-rw-r--r--cmd/gitaly-ssh/receive_pack.go2
-rw-r--r--cmd/gitaly-ssh/upload_archive.go2
-rw-r--r--cmd/gitaly-ssh/upload_pack.go22
-rw-r--r--internal/gitaly/service/hook/pack_objects.go10
-rw-r--r--internal/gitaly/service/ssh/upload_pack.go106
-rw-r--r--internal/gitaly/service/ssh/upload_pack_test.go43
-rw-r--r--internal/sidechannel/proxy.go51
-rw-r--r--internal/sidechannel/proxy_test.go66
-rw-r--r--internal/stream/pktline.go74
-rw-r--r--proto/go/gitalypb/ssh.pb.go361
-rw-r--r--proto/go/gitalypb/ssh_grpc.pb.go41
-rw-r--r--proto/ssh.proto19
-rw-r--r--ruby/proto/gitaly/ssh_pb.rb9
-rw-r--r--ruby/proto/gitaly/ssh_services_pb.rb2
-rw-r--r--streamio/stream.go2
-rw-r--r--streamio/stream_test.go20
20 files changed, 674 insertions, 247 deletions
diff --git a/client/upload_pack.go b/client/upload_pack.go
index d0f4ea60b..8d971804b 100644
--- a/client/upload_pack.go
+++ b/client/upload_pack.go
@@ -40,3 +40,26 @@ func UploadPack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, std
}
}, stdout, stderr)
}
+
+// UploadPackWithSidechannel proxies an SSH git-upload-pack (git fetch)
+// session to Gitaly using a sidechannel for the raw data transfer.
+func UploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, reg *SidechannelRegistry, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadPackWithSidechannelRequest) (int32, error) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ ctx, wt := reg.Register(ctx, func(c SidechannelConn) error {
+ return stream.ProxyPktLine(c, stdin, stdout, stderr)
+ })
+ defer wt.Close()
+
+ sshClient := gitalypb.NewSSHServiceClient(conn)
+ if _, err := sshClient.SSHUploadPackWithSidechannel(ctx, req); err != nil {
+ return 0, err
+ }
+
+ if err := wt.Close(); err != nil {
+ return 0, err
+ }
+
+ return 0, nil
+}
diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go
index afa9af135..259b2c666 100644
--- a/cmd/gitaly-hooks/hooks.go
+++ b/cmd/gitaly-hooks/hooks.go
@@ -12,7 +12,6 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth"
"gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/git"
- "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/prometheus"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook"
@@ -369,34 +368,7 @@ func packObjectsHook(ctx context.Context, payload git.HooksPayload, hookClient g
func handlePackObjectsWithSidechannel(ctx context.Context, hookClient gitalypb.HookServiceClient, repo *gitalypb.Repository, args []string) error {
ctx, wt, err := hook.SetupSidechannel(ctx, func(c *net.UnixConn) error {
- // We don't have to worry about concurrent reads and writes and
- // deadlocks, because we're connected to git-upload-pack which follows
- // the sequence: (1) write to stdin of pack-objects, (2) close stdin of
- // pack-objects, (3) concurrently read from stdout and stderr of
- // pack-objects.
- if _, err := io.Copy(c, os.Stdin); err != nil {
- return fmt.Errorf("copy stdin: %w", err)
- }
- if err := c.CloseWrite(); err != nil {
- return fmt.Errorf("close write: %w", err)
- }
-
- if err := pktline.EachSidebandPacket(c, func(band byte, data []byte) error {
- var err error
- switch band {
- case 1:
- _, err = os.Stdout.Write(data)
- case 2:
- _, err = os.Stderr.Write(data)
- default:
- err = fmt.Errorf("unexpected side band: %d", band)
- }
- return err
- }); err != nil {
- return fmt.Errorf("demux response: %w", err)
- }
-
- return nil
+ return stream.ProxyPktLine(c, os.Stdin, os.Stdout, os.Stderr)
})
if err != nil {
return fmt.Errorf("SetupSidechannel: %w", err)
diff --git a/cmd/gitaly-ssh/main.go b/cmd/gitaly-ssh/main.go
index b7a4f51dd..137ec690b 100644
--- a/cmd/gitaly-ssh/main.go
+++ b/cmd/gitaly-ssh/main.go
@@ -8,6 +8,7 @@ import (
"strconv"
"strings"
+ "github.com/sirupsen/logrus"
gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth"
"gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
@@ -15,7 +16,7 @@ import (
"google.golang.org/grpc"
)
-type packFn func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error)
+type packFn func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error)
type gitalySSHCommand struct {
// The git packer that shall be executed. One of receivePack,
@@ -37,6 +38,7 @@ type gitalySSHCommand struct {
// GITALY_PAYLOAD="{repo...}"
// GITALY_WD="/path/to/working-directory"
// GITALY_FEATUREFLAGS="upload_pack_filter:false,hooks_rpc:true"
+// GITALY_USE_SIDECHANNEL=1 if desired
// gitaly-ssh upload-pack <git-garbage-x2>
func main() {
// < 4 since git throws on 2x garbage here
@@ -49,7 +51,11 @@ func main() {
var packer packFn
switch command {
case "upload-pack":
- packer = uploadPack
+ if useSidechannel() {
+ packer = uploadPackWithSidechannel
+ } else {
+ packer = uploadPack
+ }
case "receive-pack":
packer = receivePack
case "upload-archive":
@@ -104,13 +110,14 @@ func (cmd gitalySSHCommand) run() (int, error) {
}
}
- conn, err := getConnection(cmd.address)
+ registry := client.NewSidechannelRegistry(logrus.NewEntry(logrus.StandardLogger()))
+ conn, err := getConnection(ctx, cmd.address, registry)
if err != nil {
return 1, err
}
defer conn.Close()
- code, err := cmd.packer(ctx, conn, cmd.payload)
+ code, err := cmd.packer(ctx, conn, registry, cmd.payload)
if err != nil {
return 1, err
}
@@ -118,12 +125,16 @@ func (cmd gitalySSHCommand) run() (int, error) {
return int(code), nil
}
-func getConnection(url string) (*grpc.ClientConn, error) {
+func getConnection(ctx context.Context, url string, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) {
if url == "" {
return nil, fmt.Errorf("gitaly address can not be empty")
}
- return client.Dial(url, dialOpts())
+ if useSidechannel() {
+ return client.DialSidechannel(ctx, url, registry, dialOpts())
+ }
+
+ return client.DialContext(ctx, url, dialOpts())
}
func dialOpts() []grpc.DialOption {
@@ -134,3 +145,5 @@ func dialOpts() []grpc.DialOption {
return connOpts
}
+
+func useSidechannel() bool { return os.Getenv("GITALY_USE_SIDECHANNEL") == "1" }
diff --git a/cmd/gitaly-ssh/main_test.go b/cmd/gitaly-ssh/main_test.go
index 2ec16c736..76327740a 100644
--- a/cmd/gitaly-ssh/main_test.go
+++ b/cmd/gitaly-ssh/main_test.go
@@ -7,14 +7,21 @@ import (
"testing"
"github.com/stretchr/testify/assert"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"google.golang.org/grpc"
)
func TestRun(t *testing.T) {
- var successPacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 0, nil }
- var exitCodePacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 123, nil }
- var errorPacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 1, fmt.Errorf("fail") }
+ var successPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) {
+ return 0, nil
+ }
+ var exitCodePacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) {
+ return 123, nil
+ }
+ var errorPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) {
+ return 1, fmt.Errorf("fail")
+ }
gitalyTCPAddress := "tcp://localhost:9999"
gitalyUnixAddress := fmt.Sprintf("unix://%s", testhelper.GetTemporaryGitalySocketFileName(t))
diff --git a/cmd/gitaly-ssh/receive_pack.go b/cmd/gitaly-ssh/receive_pack.go
index 57711db5c..62cc84e65 100644
--- a/cmd/gitaly-ssh/receive_pack.go
+++ b/cmd/gitaly-ssh/receive_pack.go
@@ -11,7 +11,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)
-func receivePack(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) {
+func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
var request gitalypb.SSHReceivePackRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
diff --git a/cmd/gitaly-ssh/upload_archive.go b/cmd/gitaly-ssh/upload_archive.go
index 051ceb311..618bb92d6 100644
--- a/cmd/gitaly-ssh/upload_archive.go
+++ b/cmd/gitaly-ssh/upload_archive.go
@@ -11,7 +11,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"
)
-func uploadArchive(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) {
+func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
var request gitalypb.SSHUploadArchiveRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
return 0, fmt.Errorf("json unmarshal: %w", err)
diff --git a/cmd/gitaly-ssh/upload_pack.go b/cmd/gitaly-ssh/upload_pack.go
index 64e0f580a..97ba6b7c7 100644
--- a/cmd/gitaly-ssh/upload_pack.go
+++ b/cmd/gitaly-ssh/upload_pack.go
@@ -18,16 +18,34 @@ const (
GitConfigShowAllRefs = "transfer.hideRefs=!refs"
)
-func uploadPack(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) {
+func uploadPackConfig(config []string) []string {
+ return append([]string{GitConfigShowAllRefs}, config...)
+}
+
+func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
var request gitalypb.SSHUploadPackRequest
if err := protojson.Unmarshal([]byte(req), &request); err != nil {
return 0, fmt.Errorf("json unmarshal: %w", err)
}
- request.GitConfigOptions = append([]string{GitConfigShowAllRefs}, request.GitConfigOptions...)
+ request.GitConfigOptions = uploadPackConfig(request.GitConfigOptions)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
return client.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request)
}
+
+func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) {
+ var request gitalypb.SSHUploadPackWithSidechannelRequest
+ if err := protojson.Unmarshal([]byte(req), &request); err != nil {
+ return 0, fmt.Errorf("json unmarshal: %w", err)
+ }
+
+ request.GitConfigOptions = uploadPackConfig(request.GitConfigOptions)
+
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ return client.UploadPackWithSidechannel(ctx, conn, registry, os.Stdin, os.Stdout, os.Stderr, &request)
+}
diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go
index f75204cea..45cff4d8b 100644
--- a/internal/gitaly/service/hook/pack_objects.go
+++ b/internal/gitaly/service/hook/pack_objects.go
@@ -22,6 +22,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/stream"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
@@ -42,11 +43,6 @@ var (
})
)
-const (
- bandStdout = 1
- bandStderr = 2
-)
-
func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, reqHash proto.Message, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error {
data, err := protojson.Marshal(reqHash)
if err != nil {
@@ -144,9 +140,9 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb
counter := &helper.CountingWriter{W: w}
sw := pktline.NewSidebandWriter(counter)
- stdout := bufio.NewWriterSize(sw.Writer(bandStdout), pktline.MaxSidebandData)
+ stdout := bufio.NewWriterSize(sw.Writer(stream.BandStdout), pktline.MaxSidebandData)
stderrBuf := &bytes.Buffer{}
- stderr := io.MultiWriter(sw.Writer(bandStderr), stderrBuf)
+ stderr := io.MultiWriter(sw.Writer(stream.BandStderr), stderrBuf)
defer func() {
packObjectsGeneratedBytes.Add(float64(counter.N))
diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go
index 21206a233..b217b9218 100644
--- a/internal/gitaly/service/ssh/upload_pack.go
+++ b/internal/gitaly/service/ssh/upload_pack.go
@@ -13,11 +13,15 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/stats"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/stream"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/v14/streamio"
)
func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) error {
+ ctx := stream.Context()
+
req, err := stream.Recv() // First request contains Repository only
if err != nil {
return helper.ErrInternal(err)
@@ -28,7 +32,7 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e
repository = req.Repository.GlRepository
}
- ctxlogrus.Extract(stream.Context()).WithFields(log.Fields{
+ ctxlogrus.Extract(ctx).WithFields(log.Fields{
"GlRepository": repository,
"GitConfigOptions": req.GitConfigOptions,
"GitProtocol": req.GitProtocol,
@@ -38,17 +42,6 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e
return helper.ErrInvalidArgument(err)
}
- if err = s.sshUploadPack(stream, req); err != nil {
- return helper.ErrInternal(err)
- }
-
- return nil
-}
-
-func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, req *gitalypb.SSHUploadPackRequest) error {
- ctx, cancelCtx := context.WithCancel(stream.Context())
- defer cancelCtx()
-
stdin := streamio.NewReader(func() ([]byte, error) {
request, err := stream.Recv()
return request.GetStdin(), err
@@ -57,29 +50,51 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
// gRPC doesn't allow concurrent writes to a stream, so we need to
// synchronize writing stdout and stderrr.
var m sync.Mutex
-
- stdoutCounter := &helper.CountingWriter{
- W: streamio.NewSyncWriter(&m, func(p []byte) error {
- return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p})
- }),
- }
- // Use large copy buffer to reduce the number of system calls
- stdout := &largeBufferReaderFrom{Writer: stdoutCounter}
-
+ stdout := streamio.NewSyncWriter(&m, func(p []byte) error {
+ return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p})
+ })
stderr := streamio.NewSyncWriter(&m, func(p []byte) error {
return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p})
})
- repoPath, err := s.locator.GetRepoPath(req.Repository)
+ if status, err := s.sshUploadPack(ctx, req, stdin, stdout, stderr); err != nil {
+ if errSend := stream.Send(&gitalypb.SSHUploadPackResponse{
+ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)},
+ }); errSend != nil {
+ ctxlogrus.Extract(ctx).WithError(errSend).Error("send final status code")
+ }
+
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+}
+
+type sshUploadPackRequest interface {
+ GetRepository() *gitalypb.Repository
+ GetGitConfigOptions() []string
+ GetGitProtocol() string
+}
+
+func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
+ ctx, cancelCtx := context.WithCancel(ctx)
+ defer cancelCtx()
+
+ stdoutCounter := &helper.CountingWriter{W: stdout}
+ // Use large copy buffer to reduce the number of system calls
+ stdout = &largeBufferReaderFrom{Writer: stdoutCounter}
+
+ repo := req.GetRepository()
+ repoPath, err := s.locator.GetRepoPath(repo)
if err != nil {
- return err
+ return 0, err
}
- git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().StorageName, repoPath)
+ git.WarnIfTooManyBitmaps(ctx, s.locator, repo.StorageName, repoPath)
- config, err := git.ConvertConfigOptions(req.GitConfigOptions)
+ config, err := git.ConvertConfigOptions(req.GetGitConfigOptions())
if err != nil {
- return err
+ return 0, err
}
pr, pw := io.Pipe()
@@ -96,7 +111,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
stats, err := stats.ParsePackfileNegotiation(pr)
if err != nil {
- ctxlogrus.Extract(stream.Context()).WithError(err).Debug("failed parsing packfile negotiation")
+ ctxlogrus.Extract(ctx).WithError(err).Debug("failed parsing packfile negotiation")
return
}
stats.UpdateMetrics(s.packfileNegotiationMetrics)
@@ -105,7 +120,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
commandOpts := []git.CmdOpt{
git.WithGitProtocol(req),
git.WithConfig(config...),
- git.WithPackObjectsHookEnv(req.Repository),
+ git.WithPackObjectsHookEnv(repo),
}
cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, stdin, stdout, stderr, git.SubCmd{
@@ -113,7 +128,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
Args: []string{repoPath},
}, commandOpts...)
if err != nil {
- return err
+ return 0, err
}
timeoutTicker := helper.NewTimerTicker(s.uploadPackRequestTimeout)
@@ -130,15 +145,8 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
pw.Close()
wg.Wait()
- if status, ok := command.ExitStatus(err); ok {
- if sendErr := stream.Send(&gitalypb.SSHUploadPackResponse{
- ExitStatus: &gitalypb.ExitStatus{Value: int32(status)},
- }); sendErr != nil {
- return sendErr
- }
- return fmt.Errorf("SSHUploadPack: %v", err)
- }
- return fmt.Errorf("cmd wait: %v", err)
+ status, _ := command.ExitStatus(err)
+ return status, fmt.Errorf("cmd wait: %w", err)
}
pw.Close()
@@ -146,7 +154,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
ctxlogrus.Extract(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details")
- return nil
+ return 0, nil
}
func validateFirstUploadPackRequest(req *gitalypb.SSHUploadPackRequest) error {
@@ -164,3 +172,23 @@ type largeBufferReaderFrom struct {
func (rf *largeBufferReaderFrom) ReadFrom(r io.Reader) (int64, error) {
return io.CopyBuffer(rf.Writer, r, make([]byte, 64*1024))
}
+
+func (s *server) SSHUploadPackWithSidechannel(ctx context.Context, req *gitalypb.SSHUploadPackWithSidechannelRequest) (*gitalypb.SSHUploadPackWithSidechannelResponse, error) {
+ conn, err := sidechannel.OpenSidechannel(ctx)
+ if err != nil {
+ return nil, helper.ErrUnavailable(err)
+ }
+ defer conn.Close()
+
+ sidebandWriter := pktline.NewSidebandWriter(conn)
+ stdout := sidebandWriter.Writer(stream.BandStdout)
+ stderr := sidebandWriter.Writer(stream.BandStderr)
+ if _, err := s.sshUploadPack(ctx, req, conn, stdout, stderr); err != nil {
+ return nil, helper.ErrInternal(err)
+ }
+ if err := conn.Close(); err != nil {
+ return nil, helper.ErrInternalf("close sidechannel: %w", err)
+ }
+
+ return &gitalypb.SSHUploadPackWithSidechannelResponse{}, nil
+}
diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go
index 5afa391da..97fd079d1 100644
--- a/internal/gitaly/service/ssh/upload_pack_test.go
+++ b/internal/gitaly/service/ssh/upload_pack_test.go
@@ -33,6 +33,7 @@ type cloneCommand struct {
gitConfig string
gitProtocol string
cfg config.Cfg
+ sidechannel bool
}
func runTestWithAndWithoutConfigOptions(t *testing.T, tf func(t *testing.T, opts ...testcfg.Option), opts ...testcfg.Option) {
@@ -64,15 +65,20 @@ func (cmd cloneCommand) execute(t *testing.T) error {
ctx, cancel := testhelper.Context()
defer cancel()
+ env := []string{
+ fmt.Sprintf("GITALY_ADDRESS=%s", cmd.server),
+ fmt.Sprintf("GITALY_PAYLOAD=%s", payload),
+ fmt.Sprintf("GITALY_FEATUREFLAGS=%s", strings.Join(flagPairs, ",")),
+ fmt.Sprintf("PATH=.:%s", os.Getenv("PATH")),
+ fmt.Sprintf(`GIT_SSH_COMMAND=%s upload-pack`, filepath.Join(cmd.cfg.BinDir, "gitaly-ssh")),
+ }
+ if cmd.sidechannel {
+ env = append(env, "GITALY_USE_SIDECHANNEL=1")
+ }
+
var output bytes.Buffer
gitCommand, err := gittest.NewCommandFactory(t, cmd.cfg).NewWithoutRepo(ctx,
- cmd.command, git.WithStdout(&output), git.WithStderr(&output), git.WithEnv(
- fmt.Sprintf("GITALY_ADDRESS=%s", cmd.server),
- fmt.Sprintf("GITALY_PAYLOAD=%s", payload),
- fmt.Sprintf("GITALY_FEATUREFLAGS=%s", strings.Join(flagPairs, ",")),
- fmt.Sprintf("PATH=.:%s", os.Getenv("PATH")),
- fmt.Sprintf(`GIT_SSH_COMMAND=%s upload-pack`, filepath.Join(cmd.cfg.BinDir, "gitaly-ssh")),
- ), git.WithDisabledHooks(),
+ cmd.command, git.WithStdout(&output), git.WithStderr(&output), git.WithEnv(env...), git.WithDisabledHooks(),
)
require.NoError(t, err)
@@ -221,6 +227,20 @@ func TestUploadPackCloneSuccess(t *testing.T) {
}
func testUploadPackCloneSuccess(t *testing.T, opts ...testcfg.Option) {
+ testUploadPackCloneSuccess2(t, false, opts...)
+}
+
+func TestUploadPackWithSidechannelCloneSuccess(t *testing.T) {
+ t.Parallel()
+
+ runTestWithAndWithoutConfigOptions(t, testUploadPackWithSidechannelCloneSuccess, testcfg.WithPackObjectsCacheEnabled())
+}
+
+func testUploadPackWithSidechannelCloneSuccess(t *testing.T, opts ...testcfg.Option) {
+ testUploadPackCloneSuccess2(t, true, opts...)
+}
+
+func testUploadPackCloneSuccess2(t *testing.T, sidechannel bool, opts ...testcfg.Option) {
cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...)
testcfg.BuildGitalyHooks(t, cfg)
@@ -263,10 +283,11 @@ func testUploadPackCloneSuccess(t *testing.T, opts ...testcfg.Option) {
negotiationMetrics.Reset()
cmd := cloneCommand{
- repository: repo,
- command: tc.cmd,
- server: serverSocketPath,
- cfg: cfg,
+ repository: repo,
+ command: tc.cmd,
+ server: serverSocketPath,
+ cfg: cfg,
+ sidechannel: sidechannel,
}
lHead, rHead, _, _ := cmd.test(t, cfg, repoPath, localRepoPath)
require.Equal(t, lHead, rHead, "local and remote head not equal")
diff --git a/internal/sidechannel/proxy.go b/internal/sidechannel/proxy.go
index 2302d44eb..db184ff88 100644
--- a/internal/sidechannel/proxy.go
+++ b/internal/sidechannel/proxy.go
@@ -84,43 +84,54 @@ func proxy(ctx context.Context) func(*ClientConn) error {
}
defer downstream.Close()
- const nStreams = 2
- errC := make(chan error, nStreams)
-
+ fromDownstream := make(chan error, 1)
go func() {
- errC <- func() error {
+ fromDownstream <- func() error {
if _, err := io.Copy(upstream, downstream); err != nil {
- return err
+ return fmt.Errorf("copy to upstream: %w", err)
+ }
+
+ if err := upstream.CloseWrite(); err != nil {
+ return fmt.Errorf("closewrite upstream: %w", err)
}
- // Downstream.Read() has returned EOF. That means we are done proxying
- // the request body from downstream to upstream. Propagate this EOF to
- // upstream by calling CloseWrite(). Use CloseWrite(), not Close(),
- // because we still want to read the response body from upstream in the
- // other goroutine.
- return upstream.CloseWrite()
+ return nil
}()
}()
+ fromUpstream := make(chan error, 1)
go func() {
- errC <- func() error {
+ fromUpstream <- func() error {
if _, err := io.Copy(downstream, upstream); err != nil {
- return err
+ return fmt.Errorf("copy to downstream: %w", err)
}
- // Upstream is now closed for both reads and writes. Propagate this state
- // to downstream. This also happens via defer, but this way we can log
- // the Close error if there is one.
- return downstream.Close()
+ return nil
}()
}()
- for i := 0; i < nStreams; i++ {
- if err := <-errC; err != nil {
- return err
+ waitForUpstream:
+ for {
+ select {
+ case err := <-fromUpstream:
+ if err != nil {
+ return err
+ }
+
+ break waitForUpstream
+ case err := <-fromDownstream:
+ if err != nil {
+ return err
+ }
+ case <-ctx.Done():
+ return ctx.Err()
}
}
+ if err := downstream.Close(); err != nil {
+ return fmt.Errorf("close downstream: %w", err)
+ }
+
return nil
}
}
diff --git a/internal/sidechannel/proxy_test.go b/internal/sidechannel/proxy_test.go
index 1501a02d3..103ecfd63 100644
--- a/internal/sidechannel/proxy_test.go
+++ b/internal/sidechannel/proxy_test.go
@@ -19,17 +19,25 @@ import (
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
-func testProxyServer(ctx context.Context) (err error) {
+func testProxyServer(ctx context.Context, expectEOF bool) (err error) {
conn, err := OpenSidechannel(ctx)
if err != nil {
return err
}
defer conn.Close()
- buf, err := io.ReadAll(conn)
+ var buf []byte
+ if expectEOF {
+ buf, err = io.ReadAll(conn)
+ } else {
+ buf = make([]byte, 5)
+ _, err = conn.Read(buf)
+ }
+
if err != nil {
return fmt.Errorf("server read: %w", err)
}
+
if string(buf) != "hello" {
return fmt.Errorf("server: unexpected request: %q", buf)
}
@@ -44,30 +52,38 @@ func testProxyServer(ctx context.Context) (err error) {
return nil
}
-func testProxyClient(conn *ClientConn) (err error) {
- if _, err := io.WriteString(conn, "hello"); err != nil {
- return fmt.Errorf("client write: %w", err)
- }
- if err := conn.CloseWrite(); err != nil {
- return err
- }
+func testProxyClient(closeWrite bool) func(*ClientConn) error {
+ return func(conn *ClientConn) (err error) {
+ if _, err := io.WriteString(conn, "hello"); err != nil {
+ return fmt.Errorf("client write: %w", err)
+ }
+ if closeWrite {
+ if err := conn.CloseWrite(); err != nil {
+ return err
+ }
+ }
- buf, err := io.ReadAll(conn)
- if err != nil {
- return fmt.Errorf("client read: %w", err)
- }
- if string(buf) != "world" {
- return fmt.Errorf("client: unexpected response: %q", buf)
- }
+ buf, err := io.ReadAll(conn)
+ if err != nil {
+ return fmt.Errorf("client read: %w", err)
+ }
+ if string(buf) != "world" {
+ return fmt.Errorf("client: unexpected response: %q", buf)
+ }
- return nil
+ return nil
+ }
}
-func TestUnaryProxy(t *testing.T) {
+func TestUnaryProxy(t *testing.T) { testUnaryProxy(t, true) }
+
+func TestUnaryProxy_withoutCloseWrite(t *testing.T) { testUnaryProxy(t, false) }
+
+func testUnaryProxy(t *testing.T, closeWrite bool) {
upstreamAddr := startServer(
t,
func(ctx context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
- if err := testProxyServer(ctx); err != nil {
+ if err := testProxyServer(ctx, closeWrite); err != nil {
return nil, err
}
return &healthpb.HealthCheckResponse{}, nil
@@ -92,7 +108,7 @@ func TestUnaryProxy(t *testing.T) {
defer cancel()
conn, registry := dial(t, proxyAddr)
- require.NoError(t, call(ctx, conn, registry, testProxyClient))
+ require.NoError(t, call(ctx, conn, registry, testProxyClient(closeWrite)))
}
func newLogger() *logrus.Entry { return logrus.NewEntry(logrus.New()) }
@@ -115,11 +131,15 @@ func dialProxy(upstreamAddr string) (*grpc.ClientConn, error) {
return grpc.Dial(upstreamAddr, dialOpts...)
}
-func TestStreamProxy(t *testing.T) {
+func TestStreamProxy(t *testing.T) { testStreamProxy(t, true) }
+
+func TestStreamProxy_noCloseWrite(t *testing.T) { testStreamProxy(t, false) }
+
+func testStreamProxy(t *testing.T, closeWrite bool) {
upstreamAddr := startStreamServer(
t,
func(stream gitalypb.SSHService_SSHUploadPackServer) error {
- return testProxyServer(stream.Context())
+ return testProxyServer(stream.Context(), closeWrite)
},
)
@@ -150,7 +170,7 @@ func TestStreamProxy(t *testing.T) {
defer cancel()
conn, registry := dial(t, proxyAddr)
- ctx, waiter := RegisterSidechannel(ctx, registry, testProxyClient)
+ ctx, waiter := RegisterSidechannel(ctx, registry, testProxyClient(closeWrite))
defer waiter.Close()
client, err := gitalypb.NewSSHServiceClient(conn).SSHUploadPack(ctx)
diff --git a/internal/stream/pktline.go b/internal/stream/pktline.go
new file mode 100644
index 000000000..d83eb3a45
--- /dev/null
+++ b/internal/stream/pktline.go
@@ -0,0 +1,74 @@
+package stream
+
+import (
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline"
+)
+
+// Conn represents a bi-directional client side connection.
+type Conn interface {
+ io.Reader
+ io.Writer
+ CloseWrite() error
+}
+
+// BandStdout and BandStderr are the pktline band ID's for stdout and
+// stderr, respectively.
+const (
+ BandStdout = 1
+ BandStderr = 2
+)
+
+// ProxyPktLine lets a client de-multiplex and proxy stdin, stdout and
+// stderr streams. Stdout and stderr are interleaved with Git's pktline
+// format.
+func ProxyPktLine(c Conn, stdin io.Reader, stdout, stderr io.Writer) error {
+ copyRequest := func(c Conn) error {
+ if _, err := io.Copy(c, stdin); err != nil {
+ return fmt.Errorf("copy request: %w", err)
+ }
+ if err := c.CloseWrite(); err != nil {
+ return fmt.Errorf("close request: %w", err)
+ }
+ return nil
+ }
+
+ copyResponse := func(c Conn) error {
+ if err := pktline.EachSidebandPacket(c, func(band byte, data []byte) (err error) {
+ switch band {
+ case BandStdout:
+ _, err = stdout.Write(data)
+ case BandStderr:
+ _, err = stderr.Write(data)
+ default:
+ err = fmt.Errorf("unexpected band: %d", band)
+ }
+ return err
+ }); err != nil {
+ return fmt.Errorf("copy response: %w", err)
+ }
+
+ return nil
+ }
+
+ request := make(chan error, 1)
+ go func() { request <- copyRequest(c) }()
+
+ response := make(chan error, 1)
+ go func() { response <- copyResponse(c) }()
+
+ for {
+ select {
+ case err := <-response:
+ // Server is done. No point in waiting for client.
+ return err
+ case err := <-request:
+ if err != nil {
+ return err
+ }
+ // Client is now done. Wait for server to finish too.
+ }
+ }
+}
diff --git a/proto/go/gitalypb/ssh.pb.go b/proto/go/gitalypb/ssh.pb.go
index c524ea2eb..32a62d738 100644
--- a/proto/go/gitalypb/ssh.pb.go
+++ b/proto/go/gitalypb/ssh.pb.go
@@ -162,6 +162,110 @@ func (x *SSHUploadPackResponse) GetExitStatus() *ExitStatus {
return nil
}
+type SSHUploadPackWithSidechannelRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // 'repository' must be present in the first message.
+ Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"`
+ // Parameters to use with git -c (key=value pairs)
+ GitConfigOptions []string `protobuf:"bytes,2,rep,name=git_config_options,json=gitConfigOptions,proto3" json:"git_config_options,omitempty"`
+ // Git protocol version
+ GitProtocol string `protobuf:"bytes,3,opt,name=git_protocol,json=gitProtocol,proto3" json:"git_protocol,omitempty"`
+}
+
+func (x *SSHUploadPackWithSidechannelRequest) Reset() {
+ *x = SSHUploadPackWithSidechannelRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_ssh_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SSHUploadPackWithSidechannelRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SSHUploadPackWithSidechannelRequest) ProtoMessage() {}
+
+func (x *SSHUploadPackWithSidechannelRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_ssh_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SSHUploadPackWithSidechannelRequest.ProtoReflect.Descriptor instead.
+func (*SSHUploadPackWithSidechannelRequest) Descriptor() ([]byte, []int) {
+ return file_ssh_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *SSHUploadPackWithSidechannelRequest) GetRepository() *Repository {
+ if x != nil {
+ return x.Repository
+ }
+ return nil
+}
+
+func (x *SSHUploadPackWithSidechannelRequest) GetGitConfigOptions() []string {
+ if x != nil {
+ return x.GitConfigOptions
+ }
+ return nil
+}
+
+func (x *SSHUploadPackWithSidechannelRequest) GetGitProtocol() string {
+ if x != nil {
+ return x.GitProtocol
+ }
+ return ""
+}
+
+type SSHUploadPackWithSidechannelResponse struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+}
+
+func (x *SSHUploadPackWithSidechannelResponse) Reset() {
+ *x = SSHUploadPackWithSidechannelResponse{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_ssh_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SSHUploadPackWithSidechannelResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SSHUploadPackWithSidechannelResponse) ProtoMessage() {}
+
+func (x *SSHUploadPackWithSidechannelResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_ssh_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SSHUploadPackWithSidechannelResponse.ProtoReflect.Descriptor instead.
+func (*SSHUploadPackWithSidechannelResponse) Descriptor() ([]byte, []int) {
+ return file_ssh_proto_rawDescGZIP(), []int{3}
+}
+
type SSHReceivePackRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -185,7 +289,7 @@ type SSHReceivePackRequest struct {
func (x *SSHReceivePackRequest) Reset() {
*x = SSHReceivePackRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_ssh_proto_msgTypes[2]
+ mi := &file_ssh_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -198,7 +302,7 @@ func (x *SSHReceivePackRequest) String() string {
func (*SSHReceivePackRequest) ProtoMessage() {}
func (x *SSHReceivePackRequest) ProtoReflect() protoreflect.Message {
- mi := &file_ssh_proto_msgTypes[2]
+ mi := &file_ssh_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -211,7 +315,7 @@ func (x *SSHReceivePackRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use SSHReceivePackRequest.ProtoReflect.Descriptor instead.
func (*SSHReceivePackRequest) Descriptor() ([]byte, []int) {
- return file_ssh_proto_rawDescGZIP(), []int{2}
+ return file_ssh_proto_rawDescGZIP(), []int{4}
}
func (x *SSHReceivePackRequest) GetRepository() *Repository {
@@ -280,7 +384,7 @@ type SSHReceivePackResponse struct {
func (x *SSHReceivePackResponse) Reset() {
*x = SSHReceivePackResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_ssh_proto_msgTypes[3]
+ mi := &file_ssh_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -293,7 +397,7 @@ func (x *SSHReceivePackResponse) String() string {
func (*SSHReceivePackResponse) ProtoMessage() {}
func (x *SSHReceivePackResponse) ProtoReflect() protoreflect.Message {
- mi := &file_ssh_proto_msgTypes[3]
+ mi := &file_ssh_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -306,7 +410,7 @@ func (x *SSHReceivePackResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use SSHReceivePackResponse.ProtoReflect.Descriptor instead.
func (*SSHReceivePackResponse) Descriptor() ([]byte, []int) {
- return file_ssh_proto_rawDescGZIP(), []int{3}
+ return file_ssh_proto_rawDescGZIP(), []int{5}
}
func (x *SSHReceivePackResponse) GetStdout() []byte {
@@ -344,7 +448,7 @@ type SSHUploadArchiveRequest struct {
func (x *SSHUploadArchiveRequest) Reset() {
*x = SSHUploadArchiveRequest{}
if protoimpl.UnsafeEnabled {
- mi := &file_ssh_proto_msgTypes[4]
+ mi := &file_ssh_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -357,7 +461,7 @@ func (x *SSHUploadArchiveRequest) String() string {
func (*SSHUploadArchiveRequest) ProtoMessage() {}
func (x *SSHUploadArchiveRequest) ProtoReflect() protoreflect.Message {
- mi := &file_ssh_proto_msgTypes[4]
+ mi := &file_ssh_proto_msgTypes[6]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -370,7 +474,7 @@ func (x *SSHUploadArchiveRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use SSHUploadArchiveRequest.ProtoReflect.Descriptor instead.
func (*SSHUploadArchiveRequest) Descriptor() ([]byte, []int) {
- return file_ssh_proto_rawDescGZIP(), []int{4}
+ return file_ssh_proto_rawDescGZIP(), []int{6}
}
func (x *SSHUploadArchiveRequest) GetRepository() *Repository {
@@ -403,7 +507,7 @@ type SSHUploadArchiveResponse struct {
func (x *SSHUploadArchiveResponse) Reset() {
*x = SSHUploadArchiveResponse{}
if protoimpl.UnsafeEnabled {
- mi := &file_ssh_proto_msgTypes[5]
+ mi := &file_ssh_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -416,7 +520,7 @@ func (x *SSHUploadArchiveResponse) String() string {
func (*SSHUploadArchiveResponse) ProtoMessage() {}
func (x *SSHUploadArchiveResponse) ProtoReflect() protoreflect.Message {
- mi := &file_ssh_proto_msgTypes[5]
+ mi := &file_ssh_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -429,7 +533,7 @@ func (x *SSHUploadArchiveResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use SSHUploadArchiveResponse.ProtoReflect.Descriptor instead.
func (*SSHUploadArchiveResponse) Descriptor() ([]byte, []int) {
- return file_ssh_proto_rawDescGZIP(), []int{5}
+ return file_ssh_proto_rawDescGZIP(), []int{7}
}
func (x *SSHUploadArchiveResponse) GetStdout() []byte {
@@ -480,69 +584,91 @@ var file_ssh_proto_rawDesc = []byte{
0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74,
0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74,
- 0x75, 0x73, 0x22, 0x93, 0x02, 0x0a, 0x15, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76,
- 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a,
- 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69,
- 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f,
- 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x13, 0x0a, 0x05,
- 0x67, 0x6c, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x67, 0x6c, 0x49,
- 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f,
- 0x72, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x67, 0x6c, 0x52, 0x65, 0x70, 0x6f,
- 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x6c, 0x5f, 0x75, 0x73, 0x65,
- 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x6c, 0x55,
- 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x69, 0x74, 0x5f, 0x70,
- 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x67,
- 0x69, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x2c, 0x0a, 0x12, 0x67, 0x69,
- 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
- 0x18, 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x67, 0x69, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69,
- 0x67, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x53, 0x53, 0x48, 0x52,
- 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
- 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01,
- 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74,
- 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65,
- 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75,
- 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79,
- 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69,
- 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x69, 0x0a, 0x17, 0x53, 0x53, 0x48, 0x55, 0x70,
- 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
- 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79,
- 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e,
- 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01,
- 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05,
- 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64,
- 0x69, 0x6e, 0x22, 0x7f, 0x0a, 0x18, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41,
- 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16,
- 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06,
- 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72,
- 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33,
- 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20,
- 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69,
- 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61,
- 0x74, 0x75, 0x73, 0x32, 0xa6, 0x02, 0x0a, 0x0a, 0x53, 0x53, 0x48, 0x53, 0x65, 0x72, 0x76, 0x69,
- 0x63, 0x65, 0x12, 0x58, 0x0a, 0x0d, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50,
- 0x61, 0x63, 0x6b, 0x12, 0x1c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48,
- 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
- 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70,
- 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x0e,
- 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x12, 0x1d,
- 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69,
- 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e,
+ 0x75, 0x73, 0x22, 0xb0, 0x01, 0x0a, 0x23, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64,
+ 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68, 0x53, 0x69, 0x64, 0x65, 0x63, 0x68, 0x61, 0x6e,
+ 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65,
+ 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12,
+ 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f,
+ 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69,
+ 0x74, 0x6f, 0x72, 0x79, 0x12, 0x2c, 0x0a, 0x12, 0x67, 0x69, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66,
+ 0x69, 0x67, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09,
+ 0x52, 0x10, 0x67, 0x69, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4f, 0x70, 0x74, 0x69, 0x6f,
+ 0x6e, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
+ 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x67, 0x69, 0x74, 0x50, 0x72, 0x6f,
+ 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0x26, 0x0a, 0x24, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f,
+ 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68, 0x53, 0x69, 0x64, 0x65, 0x63, 0x68,
+ 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x93, 0x02,
+ 0x0a, 0x15, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b,
+ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73,
+ 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69,
+ 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42,
+ 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72,
+ 0x79, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c,
+ 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x13, 0x0a, 0x05, 0x67, 0x6c, 0x5f, 0x69, 0x64,
+ 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x67, 0x6c, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d,
+ 0x67, 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x04, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x0c, 0x67, 0x6c, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72,
+ 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x6c, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65,
+ 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x6c, 0x55, 0x73, 0x65, 0x72, 0x6e, 0x61,
+ 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63,
+ 0x6f, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x67, 0x69, 0x74, 0x50, 0x72, 0x6f,
+ 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x2c, 0x0a, 0x12, 0x67, 0x69, 0x74, 0x5f, 0x63, 0x6f, 0x6e,
+ 0x66, 0x69, 0x67, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28,
+ 0x09, 0x52, 0x10, 0x67, 0x69, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4f, 0x70, 0x74, 0x69,
+ 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76,
+ 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a,
+ 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73,
+ 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a,
+ 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74,
+ 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74,
+ 0x75, 0x73, 0x22, 0x69, 0x0a, 0x17, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41,
+ 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a,
+ 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73,
+ 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70,
+ 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x22, 0x7f, 0x0a,
+ 0x18, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76,
+ 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64,
+ 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75,
+ 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69,
+ 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12,
+ 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74,
+ 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0xaa,
+ 0x03, 0x0a, 0x0a, 0x53, 0x53, 0x48, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x58, 0x0a,
+ 0x0d, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x12, 0x1c,
+ 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61,
+ 0x64, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x67,
+ 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50,
+ 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28,
+ 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x81, 0x01, 0x0a, 0x1c, 0x53, 0x53, 0x48, 0x55,
+ 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68, 0x53, 0x69, 0x64,
+ 0x65, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c,
+ 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57,
+ 0x69, 0x74, 0x68, 0x53, 0x69, 0x64, 0x65, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65,
+ 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53,
+ 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68,
+ 0x53, 0x69, 0x64, 0x65, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f,
+ 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x12, 0x5b, 0x0a, 0x0e, 0x53,
+ 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x12, 0x1d, 0x2e,
0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76,
- 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa,
- 0x97, 0x28, 0x02, 0x08, 0x01, 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x10, 0x53, 0x53, 0x48,
- 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x12, 0x1f, 0x2e,
+ 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x67,
+ 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65,
+ 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97,
+ 0x28, 0x02, 0x08, 0x01, 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x10, 0x53, 0x53, 0x48, 0x55,
+ 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x12, 0x1f, 0x2e, 0x67,
+ 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41,
+ 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e,
0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64,
- 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20,
- 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61,
- 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
- 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x42, 0x34, 0x5a, 0x32,
- 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61,
- 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34,
- 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79,
- 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
+ 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67,
+ 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62,
+ 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70,
+ 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -557,35 +683,40 @@ func file_ssh_proto_rawDescGZIP() []byte {
return file_ssh_proto_rawDescData
}
-var file_ssh_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_ssh_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_ssh_proto_goTypes = []interface{}{
- (*SSHUploadPackRequest)(nil), // 0: gitaly.SSHUploadPackRequest
- (*SSHUploadPackResponse)(nil), // 1: gitaly.SSHUploadPackResponse
- (*SSHReceivePackRequest)(nil), // 2: gitaly.SSHReceivePackRequest
- (*SSHReceivePackResponse)(nil), // 3: gitaly.SSHReceivePackResponse
- (*SSHUploadArchiveRequest)(nil), // 4: gitaly.SSHUploadArchiveRequest
- (*SSHUploadArchiveResponse)(nil), // 5: gitaly.SSHUploadArchiveResponse
- (*Repository)(nil), // 6: gitaly.Repository
- (*ExitStatus)(nil), // 7: gitaly.ExitStatus
+ (*SSHUploadPackRequest)(nil), // 0: gitaly.SSHUploadPackRequest
+ (*SSHUploadPackResponse)(nil), // 1: gitaly.SSHUploadPackResponse
+ (*SSHUploadPackWithSidechannelRequest)(nil), // 2: gitaly.SSHUploadPackWithSidechannelRequest
+ (*SSHUploadPackWithSidechannelResponse)(nil), // 3: gitaly.SSHUploadPackWithSidechannelResponse
+ (*SSHReceivePackRequest)(nil), // 4: gitaly.SSHReceivePackRequest
+ (*SSHReceivePackResponse)(nil), // 5: gitaly.SSHReceivePackResponse
+ (*SSHUploadArchiveRequest)(nil), // 6: gitaly.SSHUploadArchiveRequest
+ (*SSHUploadArchiveResponse)(nil), // 7: gitaly.SSHUploadArchiveResponse
+ (*Repository)(nil), // 8: gitaly.Repository
+ (*ExitStatus)(nil), // 9: gitaly.ExitStatus
}
var file_ssh_proto_depIdxs = []int32{
- 6, // 0: gitaly.SSHUploadPackRequest.repository:type_name -> gitaly.Repository
- 7, // 1: gitaly.SSHUploadPackResponse.exit_status:type_name -> gitaly.ExitStatus
- 6, // 2: gitaly.SSHReceivePackRequest.repository:type_name -> gitaly.Repository
- 7, // 3: gitaly.SSHReceivePackResponse.exit_status:type_name -> gitaly.ExitStatus
- 6, // 4: gitaly.SSHUploadArchiveRequest.repository:type_name -> gitaly.Repository
- 7, // 5: gitaly.SSHUploadArchiveResponse.exit_status:type_name -> gitaly.ExitStatus
- 0, // 6: gitaly.SSHService.SSHUploadPack:input_type -> gitaly.SSHUploadPackRequest
- 2, // 7: gitaly.SSHService.SSHReceivePack:input_type -> gitaly.SSHReceivePackRequest
- 4, // 8: gitaly.SSHService.SSHUploadArchive:input_type -> gitaly.SSHUploadArchiveRequest
- 1, // 9: gitaly.SSHService.SSHUploadPack:output_type -> gitaly.SSHUploadPackResponse
- 3, // 10: gitaly.SSHService.SSHReceivePack:output_type -> gitaly.SSHReceivePackResponse
- 5, // 11: gitaly.SSHService.SSHUploadArchive:output_type -> gitaly.SSHUploadArchiveResponse
- 9, // [9:12] is the sub-list for method output_type
- 6, // [6:9] is the sub-list for method input_type
- 6, // [6:6] is the sub-list for extension type_name
- 6, // [6:6] is the sub-list for extension extendee
- 0, // [0:6] is the sub-list for field type_name
+ 8, // 0: gitaly.SSHUploadPackRequest.repository:type_name -> gitaly.Repository
+ 9, // 1: gitaly.SSHUploadPackResponse.exit_status:type_name -> gitaly.ExitStatus
+ 8, // 2: gitaly.SSHUploadPackWithSidechannelRequest.repository:type_name -> gitaly.Repository
+ 8, // 3: gitaly.SSHReceivePackRequest.repository:type_name -> gitaly.Repository
+ 9, // 4: gitaly.SSHReceivePackResponse.exit_status:type_name -> gitaly.ExitStatus
+ 8, // 5: gitaly.SSHUploadArchiveRequest.repository:type_name -> gitaly.Repository
+ 9, // 6: gitaly.SSHUploadArchiveResponse.exit_status:type_name -> gitaly.ExitStatus
+ 0, // 7: gitaly.SSHService.SSHUploadPack:input_type -> gitaly.SSHUploadPackRequest
+ 2, // 8: gitaly.SSHService.SSHUploadPackWithSidechannel:input_type -> gitaly.SSHUploadPackWithSidechannelRequest
+ 4, // 9: gitaly.SSHService.SSHReceivePack:input_type -> gitaly.SSHReceivePackRequest
+ 6, // 10: gitaly.SSHService.SSHUploadArchive:input_type -> gitaly.SSHUploadArchiveRequest
+ 1, // 11: gitaly.SSHService.SSHUploadPack:output_type -> gitaly.SSHUploadPackResponse
+ 3, // 12: gitaly.SSHService.SSHUploadPackWithSidechannel:output_type -> gitaly.SSHUploadPackWithSidechannelResponse
+ 5, // 13: gitaly.SSHService.SSHReceivePack:output_type -> gitaly.SSHReceivePackResponse
+ 7, // 14: gitaly.SSHService.SSHUploadArchive:output_type -> gitaly.SSHUploadArchiveResponse
+ 11, // [11:15] is the sub-list for method output_type
+ 7, // [7:11] is the sub-list for method input_type
+ 7, // [7:7] is the sub-list for extension type_name
+ 7, // [7:7] is the sub-list for extension extendee
+ 0, // [0:7] is the sub-list for field type_name
}
func init() { file_ssh_proto_init() }
@@ -621,7 +752,7 @@ func file_ssh_proto_init() {
}
}
file_ssh_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SSHReceivePackRequest); i {
+ switch v := v.(*SSHUploadPackWithSidechannelRequest); i {
case 0:
return &v.state
case 1:
@@ -633,7 +764,7 @@ func file_ssh_proto_init() {
}
}
file_ssh_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SSHReceivePackResponse); i {
+ switch v := v.(*SSHUploadPackWithSidechannelResponse); i {
case 0:
return &v.state
case 1:
@@ -645,7 +776,7 @@ func file_ssh_proto_init() {
}
}
file_ssh_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*SSHUploadArchiveRequest); i {
+ switch v := v.(*SSHReceivePackRequest); i {
case 0:
return &v.state
case 1:
@@ -657,6 +788,30 @@ func file_ssh_proto_init() {
}
}
file_ssh_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SSHReceivePackResponse); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_ssh_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SSHUploadArchiveRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_ssh_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SSHUploadArchiveResponse); i {
case 0:
return &v.state
@@ -675,7 +830,7 @@ func file_ssh_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_ssh_proto_rawDesc,
NumEnums: 0,
- NumMessages: 6,
+ NumMessages: 8,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/proto/go/gitalypb/ssh_grpc.pb.go b/proto/go/gitalypb/ssh_grpc.pb.go
index 5d13f9896..21f761b8e 100644
--- a/proto/go/gitalypb/ssh_grpc.pb.go
+++ b/proto/go/gitalypb/ssh_grpc.pb.go
@@ -20,6 +20,8 @@ const _ = grpc.SupportPackageIsVersion7
type SSHServiceClient interface {
// To forward 'git upload-pack' to Gitaly for SSH sessions
SSHUploadPack(ctx context.Context, opts ...grpc.CallOption) (SSHService_SSHUploadPackClient, error)
+ // To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels
+ SSHUploadPackWithSidechannel(ctx context.Context, in *SSHUploadPackWithSidechannelRequest, opts ...grpc.CallOption) (*SSHUploadPackWithSidechannelResponse, error)
// To forward 'git receive-pack' to Gitaly for SSH sessions
SSHReceivePack(ctx context.Context, opts ...grpc.CallOption) (SSHService_SSHReceivePackClient, error)
// To forward 'git upload-archive' to Gitaly for SSH sessions
@@ -65,6 +67,15 @@ func (x *sSHServiceSSHUploadPackClient) Recv() (*SSHUploadPackResponse, error) {
return m, nil
}
+func (c *sSHServiceClient) SSHUploadPackWithSidechannel(ctx context.Context, in *SSHUploadPackWithSidechannelRequest, opts ...grpc.CallOption) (*SSHUploadPackWithSidechannelResponse, error) {
+ out := new(SSHUploadPackWithSidechannelResponse)
+ err := c.cc.Invoke(ctx, "/gitaly.SSHService/SSHUploadPackWithSidechannel", in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
func (c *sSHServiceClient) SSHReceivePack(ctx context.Context, opts ...grpc.CallOption) (SSHService_SSHReceivePackClient, error) {
stream, err := c.cc.NewStream(ctx, &SSHService_ServiceDesc.Streams[1], "/gitaly.SSHService/SSHReceivePack", opts...)
if err != nil {
@@ -133,6 +144,8 @@ func (x *sSHServiceSSHUploadArchiveClient) Recv() (*SSHUploadArchiveResponse, er
type SSHServiceServer interface {
// To forward 'git upload-pack' to Gitaly for SSH sessions
SSHUploadPack(SSHService_SSHUploadPackServer) error
+ // To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels
+ SSHUploadPackWithSidechannel(context.Context, *SSHUploadPackWithSidechannelRequest) (*SSHUploadPackWithSidechannelResponse, error)
// To forward 'git receive-pack' to Gitaly for SSH sessions
SSHReceivePack(SSHService_SSHReceivePackServer) error
// To forward 'git upload-archive' to Gitaly for SSH sessions
@@ -147,6 +160,9 @@ type UnimplementedSSHServiceServer struct {
func (UnimplementedSSHServiceServer) SSHUploadPack(SSHService_SSHUploadPackServer) error {
return status.Errorf(codes.Unimplemented, "method SSHUploadPack not implemented")
}
+func (UnimplementedSSHServiceServer) SSHUploadPackWithSidechannel(context.Context, *SSHUploadPackWithSidechannelRequest) (*SSHUploadPackWithSidechannelResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method SSHUploadPackWithSidechannel not implemented")
+}
func (UnimplementedSSHServiceServer) SSHReceivePack(SSHService_SSHReceivePackServer) error {
return status.Errorf(codes.Unimplemented, "method SSHReceivePack not implemented")
}
@@ -192,6 +208,24 @@ func (x *sSHServiceSSHUploadPackServer) Recv() (*SSHUploadPackRequest, error) {
return m, nil
}
+func _SSHService_SSHUploadPackWithSidechannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(SSHUploadPackWithSidechannelRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SSHServiceServer).SSHUploadPackWithSidechannel(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/gitaly.SSHService/SSHUploadPackWithSidechannel",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SSHServiceServer).SSHUploadPackWithSidechannel(ctx, req.(*SSHUploadPackWithSidechannelRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
func _SSHService_SSHReceivePack_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SSHServiceServer).SSHReceivePack(&sSHServiceSSHReceivePackServer{stream})
}
@@ -250,7 +284,12 @@ func (x *sSHServiceSSHUploadArchiveServer) Recv() (*SSHUploadArchiveRequest, err
var SSHService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "gitaly.SSHService",
HandlerType: (*SSHServiceServer)(nil),
- Methods: []grpc.MethodDesc{},
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "SSHUploadPackWithSidechannel",
+ Handler: _SSHService_SSHUploadPackWithSidechannel_Handler,
+ },
+ },
Streams: []grpc.StreamDesc{
{
StreamName: "SSHUploadPack",
diff --git a/proto/ssh.proto b/proto/ssh.proto
index 4545effbf..732d26a2c 100644
--- a/proto/ssh.proto
+++ b/proto/ssh.proto
@@ -15,6 +15,13 @@ service SSHService {
};
}
+ // To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels
+ rpc SSHUploadPackWithSidechannel(SSHUploadPackWithSidechannelRequest) returns (SSHUploadPackWithSidechannelResponse) {
+ option (op_type) = {
+ op: ACCESSOR
+ };
+ }
+
// To forward 'git receive-pack' to Gitaly for SSH sessions
rpc SSHReceivePack(stream SSHReceivePackRequest) returns (stream SSHReceivePackResponse) {
option (op_type) = {
@@ -55,6 +62,18 @@ message SSHUploadPackResponse {
ExitStatus exit_status = 3;
}
+message SSHUploadPackWithSidechannelRequest {
+ // 'repository' must be present in the first message.
+ Repository repository = 1 [(target_repository)=true];
+ // Parameters to use with git -c (key=value pairs)
+ repeated string git_config_options = 2;
+
+ // Git protocol version
+ string git_protocol = 3;
+}
+
+message SSHUploadPackWithSidechannelResponse {}
+
message SSHReceivePackRequest {
// 'repository' must be present in the first message.
Repository repository = 1 [(target_repository)=true];
diff --git a/ruby/proto/gitaly/ssh_pb.rb b/ruby/proto/gitaly/ssh_pb.rb
index fdde47b80..49e65cc14 100644
--- a/ruby/proto/gitaly/ssh_pb.rb
+++ b/ruby/proto/gitaly/ssh_pb.rb
@@ -18,6 +18,13 @@ Google::Protobuf::DescriptorPool.generated_pool.build do
optional :stderr, :bytes, 2
optional :exit_status, :message, 3, "gitaly.ExitStatus"
end
+ add_message "gitaly.SSHUploadPackWithSidechannelRequest" do
+ optional :repository, :message, 1, "gitaly.Repository"
+ repeated :git_config_options, :string, 2
+ optional :git_protocol, :string, 3
+ end
+ add_message "gitaly.SSHUploadPackWithSidechannelResponse" do
+ end
add_message "gitaly.SSHReceivePackRequest" do
optional :repository, :message, 1, "gitaly.Repository"
optional :stdin, :bytes, 2
@@ -47,6 +54,8 @@ end
module Gitaly
SSHUploadPackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackRequest").msgclass
SSHUploadPackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackResponse").msgclass
+ SSHUploadPackWithSidechannelRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackWithSidechannelRequest").msgclass
+ SSHUploadPackWithSidechannelResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackWithSidechannelResponse").msgclass
SSHReceivePackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHReceivePackRequest").msgclass
SSHReceivePackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHReceivePackResponse").msgclass
SSHUploadArchiveRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadArchiveRequest").msgclass
diff --git a/ruby/proto/gitaly/ssh_services_pb.rb b/ruby/proto/gitaly/ssh_services_pb.rb
index ed4527087..a50b0fa5a 100644
--- a/ruby/proto/gitaly/ssh_services_pb.rb
+++ b/ruby/proto/gitaly/ssh_services_pb.rb
@@ -16,6 +16,8 @@ module Gitaly
# To forward 'git upload-pack' to Gitaly for SSH sessions
rpc :SSHUploadPack, stream(::Gitaly::SSHUploadPackRequest), stream(::Gitaly::SSHUploadPackResponse)
+ # To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels
+ rpc :SSHUploadPackWithSidechannel, ::Gitaly::SSHUploadPackWithSidechannelRequest, ::Gitaly::SSHUploadPackWithSidechannelResponse
# To forward 'git receive-pack' to Gitaly for SSH sessions
rpc :SSHReceivePack, stream(::Gitaly::SSHReceivePackRequest), stream(::Gitaly::SSHReceivePackResponse)
# To forward 'git upload-archive' to Gitaly for SSH sessions
diff --git a/streamio/stream.go b/streamio/stream.go
index 0aab25315..68e6bd687 100644
--- a/streamio/stream.go
+++ b/streamio/stream.go
@@ -23,7 +23,7 @@ type receiveReader struct {
}
func (rr *receiveReader) Read(p []byte) (int, error) {
- if len(rr.data) == 0 {
+ if len(rr.data) == 0 && rr.err == nil {
rr.data, rr.err = rr.receiver()
}
diff --git a/streamio/stream_test.go b/streamio/stream_test.go
index c91d5a58f..8f7c841fd 100644
--- a/streamio/stream_test.go
+++ b/streamio/stream_test.go
@@ -2,6 +2,7 @@ package streamio
import (
"bytes"
+ "errors"
"fmt"
"io"
"strings"
@@ -60,6 +61,25 @@ func TestReadSizes(t *testing.T) {
})
}
+func TestRead_rememberError(t *testing.T) {
+ firstRead := true
+ myError := errors.New("hello world")
+ r := NewReader(func() ([]byte, error) {
+ if firstRead {
+ firstRead = false
+ return nil, myError
+ }
+ panic("should never be reached")
+ })
+
+ // Intentionally call Read more than once. We want the error to be
+ // sticky.
+ for i := 0; i < 10; i++ {
+ _, err := r.Read(nil)
+ require.Equal(t, err, myError)
+ }
+}
+
func receiverFromReader(r io.Reader) func() ([]byte, error) {
return func() ([]byte, error) {
data := make([]byte, 10)