diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-01-18 12:20:36 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-01-18 12:20:36 +0300 |
commit | b5722938b3e2a92a46f050b79864d745abcc4ecc (patch) | |
tree | ab053ba3523dc5b9e97c8036c8cb3b0801d93313 | |
parent | 870d04c22168540019a21904d529721db4d1c153 (diff) | |
parent | 3a9a94065b85801e5a1a9b4d0293c5ae90644a63 (diff) |
Merge branch 'jv-ssh-sidechannel-2' into 'master'
Add SSHUploadPackWithSidechannel
Closes gitlab-com/gl-infra/scalability#1513
See merge request gitlab-org/gitaly!4251
-rw-r--r-- | client/upload_pack.go | 23 | ||||
-rw-r--r-- | cmd/gitaly-hooks/hooks.go | 30 | ||||
-rw-r--r-- | cmd/gitaly-ssh/main.go | 25 | ||||
-rw-r--r-- | cmd/gitaly-ssh/main_test.go | 13 | ||||
-rw-r--r-- | cmd/gitaly-ssh/receive_pack.go | 2 | ||||
-rw-r--r-- | cmd/gitaly-ssh/upload_archive.go | 2 | ||||
-rw-r--r-- | cmd/gitaly-ssh/upload_pack.go | 22 | ||||
-rw-r--r-- | internal/gitaly/service/hook/pack_objects.go | 10 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack.go | 106 | ||||
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack_test.go | 43 | ||||
-rw-r--r-- | internal/sidechannel/proxy.go | 51 | ||||
-rw-r--r-- | internal/sidechannel/proxy_test.go | 66 | ||||
-rw-r--r-- | internal/stream/pktline.go | 74 | ||||
-rw-r--r-- | proto/go/gitalypb/ssh.pb.go | 361 | ||||
-rw-r--r-- | proto/go/gitalypb/ssh_grpc.pb.go | 41 | ||||
-rw-r--r-- | proto/ssh.proto | 19 | ||||
-rw-r--r-- | ruby/proto/gitaly/ssh_pb.rb | 9 | ||||
-rw-r--r-- | ruby/proto/gitaly/ssh_services_pb.rb | 2 | ||||
-rw-r--r-- | streamio/stream.go | 2 | ||||
-rw-r--r-- | streamio/stream_test.go | 20 |
20 files changed, 674 insertions, 247 deletions
diff --git a/client/upload_pack.go b/client/upload_pack.go index d0f4ea60b..8d971804b 100644 --- a/client/upload_pack.go +++ b/client/upload_pack.go @@ -40,3 +40,26 @@ func UploadPack(ctx context.Context, conn *grpc.ClientConn, stdin io.Reader, std } }, stdout, stderr) } + +// UploadPackWithSidechannel proxies an SSH git-upload-pack (git fetch) +// session to Gitaly using a sidechannel for the raw data transfer. +func UploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, reg *SidechannelRegistry, stdin io.Reader, stdout, stderr io.Writer, req *gitalypb.SSHUploadPackWithSidechannelRequest) (int32, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + ctx, wt := reg.Register(ctx, func(c SidechannelConn) error { + return stream.ProxyPktLine(c, stdin, stdout, stderr) + }) + defer wt.Close() + + sshClient := gitalypb.NewSSHServiceClient(conn) + if _, err := sshClient.SSHUploadPackWithSidechannel(ctx, req); err != nil { + return 0, err + } + + if err := wt.Close(); err != nil { + return 0, err + } + + return 0, nil +} diff --git a/cmd/gitaly-hooks/hooks.go b/cmd/gitaly-hooks/hooks.go index afa9af135..259b2c666 100644 --- a/cmd/gitaly-hooks/hooks.go +++ b/cmd/gitaly-hooks/hooks.go @@ -12,7 +12,6 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/git" - "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" @@ -369,34 +368,7 @@ func packObjectsHook(ctx context.Context, payload git.HooksPayload, hookClient g func handlePackObjectsWithSidechannel(ctx context.Context, hookClient gitalypb.HookServiceClient, repo *gitalypb.Repository, args []string) error { ctx, wt, err := hook.SetupSidechannel(ctx, func(c *net.UnixConn) error { - // We don't have to worry about concurrent reads and writes and - // deadlocks, because we're connected to git-upload-pack which follows - // the sequence: (1) write to stdin of pack-objects, (2) close stdin of - // pack-objects, (3) concurrently read from stdout and stderr of - // pack-objects. - if _, err := io.Copy(c, os.Stdin); err != nil { - return fmt.Errorf("copy stdin: %w", err) - } - if err := c.CloseWrite(); err != nil { - return fmt.Errorf("close write: %w", err) - } - - if err := pktline.EachSidebandPacket(c, func(band byte, data []byte) error { - var err error - switch band { - case 1: - _, err = os.Stdout.Write(data) - case 2: - _, err = os.Stderr.Write(data) - default: - err = fmt.Errorf("unexpected side band: %d", band) - } - return err - }); err != nil { - return fmt.Errorf("demux response: %w", err) - } - - return nil + return stream.ProxyPktLine(c, os.Stdin, os.Stdout, os.Stderr) }) if err != nil { return fmt.Errorf("SetupSidechannel: %w", err) diff --git a/cmd/gitaly-ssh/main.go b/cmd/gitaly-ssh/main.go index b7a4f51dd..137ec690b 100644 --- a/cmd/gitaly-ssh/main.go +++ b/cmd/gitaly-ssh/main.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" + "github.com/sirupsen/logrus" gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" @@ -15,7 +16,7 @@ import ( "google.golang.org/grpc" ) -type packFn func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) +type packFn func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) type gitalySSHCommand struct { // The git packer that shall be executed. One of receivePack, @@ -37,6 +38,7 @@ type gitalySSHCommand struct { // GITALY_PAYLOAD="{repo...}" // GITALY_WD="/path/to/working-directory" // GITALY_FEATUREFLAGS="upload_pack_filter:false,hooks_rpc:true" +// GITALY_USE_SIDECHANNEL=1 if desired // gitaly-ssh upload-pack <git-garbage-x2> func main() { // < 4 since git throws on 2x garbage here @@ -49,7 +51,11 @@ func main() { var packer packFn switch command { case "upload-pack": - packer = uploadPack + if useSidechannel() { + packer = uploadPackWithSidechannel + } else { + packer = uploadPack + } case "receive-pack": packer = receivePack case "upload-archive": @@ -104,13 +110,14 @@ func (cmd gitalySSHCommand) run() (int, error) { } } - conn, err := getConnection(cmd.address) + registry := client.NewSidechannelRegistry(logrus.NewEntry(logrus.StandardLogger())) + conn, err := getConnection(ctx, cmd.address, registry) if err != nil { return 1, err } defer conn.Close() - code, err := cmd.packer(ctx, conn, cmd.payload) + code, err := cmd.packer(ctx, conn, registry, cmd.payload) if err != nil { return 1, err } @@ -118,12 +125,16 @@ func (cmd gitalySSHCommand) run() (int, error) { return int(code), nil } -func getConnection(url string) (*grpc.ClientConn, error) { +func getConnection(ctx context.Context, url string, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) { if url == "" { return nil, fmt.Errorf("gitaly address can not be empty") } - return client.Dial(url, dialOpts()) + if useSidechannel() { + return client.DialSidechannel(ctx, url, registry, dialOpts()) + } + + return client.DialContext(ctx, url, dialOpts()) } func dialOpts() []grpc.DialOption { @@ -134,3 +145,5 @@ func dialOpts() []grpc.DialOption { return connOpts } + +func useSidechannel() bool { return os.Getenv("GITALY_USE_SIDECHANNEL") == "1" } diff --git a/cmd/gitaly-ssh/main_test.go b/cmd/gitaly-ssh/main_test.go index 2ec16c736..76327740a 100644 --- a/cmd/gitaly-ssh/main_test.go +++ b/cmd/gitaly-ssh/main_test.go @@ -7,14 +7,21 @@ import ( "testing" "github.com/stretchr/testify/assert" + "gitlab.com/gitlab-org/gitaly/v14/client" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "google.golang.org/grpc" ) func TestRun(t *testing.T) { - var successPacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 0, nil } - var exitCodePacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 123, nil } - var errorPacker packFn = func(_ context.Context, _ *grpc.ClientConn, _ string) (int32, error) { return 1, fmt.Errorf("fail") } + var successPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) { + return 0, nil + } + var exitCodePacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) { + return 123, nil + } + var errorPacker packFn = func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry, string) (int32, error) { + return 1, fmt.Errorf("fail") + } gitalyTCPAddress := "tcp://localhost:9999" gitalyUnixAddress := fmt.Sprintf("unix://%s", testhelper.GetTemporaryGitalySocketFileName(t)) diff --git a/cmd/gitaly-ssh/receive_pack.go b/cmd/gitaly-ssh/receive_pack.go index 57711db5c..62cc84e65 100644 --- a/cmd/gitaly-ssh/receive_pack.go +++ b/cmd/gitaly-ssh/receive_pack.go @@ -11,7 +11,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) -func receivePack(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) { +func receivePack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { var request gitalypb.SSHReceivePackRequest if err := protojson.Unmarshal([]byte(req), &request); err != nil { diff --git a/cmd/gitaly-ssh/upload_archive.go b/cmd/gitaly-ssh/upload_archive.go index 051ceb311..618bb92d6 100644 --- a/cmd/gitaly-ssh/upload_archive.go +++ b/cmd/gitaly-ssh/upload_archive.go @@ -11,7 +11,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) -func uploadArchive(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) { +func uploadArchive(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { var request gitalypb.SSHUploadArchiveRequest if err := protojson.Unmarshal([]byte(req), &request); err != nil { return 0, fmt.Errorf("json unmarshal: %w", err) diff --git a/cmd/gitaly-ssh/upload_pack.go b/cmd/gitaly-ssh/upload_pack.go index 64e0f580a..97ba6b7c7 100644 --- a/cmd/gitaly-ssh/upload_pack.go +++ b/cmd/gitaly-ssh/upload_pack.go @@ -18,16 +18,34 @@ const ( GitConfigShowAllRefs = "transfer.hideRefs=!refs" ) -func uploadPack(ctx context.Context, conn *grpc.ClientConn, req string) (int32, error) { +func uploadPackConfig(config []string) []string { + return append([]string{GitConfigShowAllRefs}, config...) +} + +func uploadPack(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { var request gitalypb.SSHUploadPackRequest if err := protojson.Unmarshal([]byte(req), &request); err != nil { return 0, fmt.Errorf("json unmarshal: %w", err) } - request.GitConfigOptions = append([]string{GitConfigShowAllRefs}, request.GitConfigOptions...) + request.GitConfigOptions = uploadPackConfig(request.GitConfigOptions) ctx, cancel := context.WithCancel(ctx) defer cancel() return client.UploadPack(ctx, conn, os.Stdin, os.Stdout, os.Stderr, &request) } + +func uploadPackWithSidechannel(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry, req string) (int32, error) { + var request gitalypb.SSHUploadPackWithSidechannelRequest + if err := protojson.Unmarshal([]byte(req), &request); err != nil { + return 0, fmt.Errorf("json unmarshal: %w", err) + } + + request.GitConfigOptions = uploadPackConfig(request.GitConfigOptions) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + return client.UploadPackWithSidechannel(ctx, conn, registry, os.Stdin, os.Stdout, os.Stderr, &request) +} diff --git a/internal/gitaly/service/hook/pack_objects.go b/internal/gitaly/service/hook/pack_objects.go index f75204cea..45cff4d8b 100644 --- a/internal/gitaly/service/hook/pack_objects.go +++ b/internal/gitaly/service/hook/pack_objects.go @@ -22,6 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/stream" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -42,11 +43,6 @@ var ( }) ) -const ( - bandStdout = 1 - bandStderr = 2 -) - func (s *server) packObjectsHook(ctx context.Context, repo *gitalypb.Repository, reqHash proto.Message, args *packObjectsArgs, stdinReader io.Reader, output io.Writer) error { data, err := protojson.Marshal(reqHash) if err != nil { @@ -144,9 +140,9 @@ func (s *server) runPackObjects(ctx context.Context, w io.Writer, repo *gitalypb counter := &helper.CountingWriter{W: w} sw := pktline.NewSidebandWriter(counter) - stdout := bufio.NewWriterSize(sw.Writer(bandStdout), pktline.MaxSidebandData) + stdout := bufio.NewWriterSize(sw.Writer(stream.BandStdout), pktline.MaxSidebandData) stderrBuf := &bytes.Buffer{} - stderr := io.MultiWriter(sw.Writer(bandStderr), stderrBuf) + stderr := io.MultiWriter(sw.Writer(stream.BandStderr), stderrBuf) defer func() { packObjectsGeneratedBytes.Add(float64(counter.N)) diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go index 21206a233..b217b9218 100644 --- a/internal/gitaly/service/ssh/upload_pack.go +++ b/internal/gitaly/service/ssh/upload_pack.go @@ -13,11 +13,15 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v14/internal/git/stats" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel" + "gitlab.com/gitlab-org/gitaly/v14/internal/stream" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v14/streamio" ) func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) error { + ctx := stream.Context() + req, err := stream.Recv() // First request contains Repository only if err != nil { return helper.ErrInternal(err) @@ -28,7 +32,7 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e repository = req.Repository.GlRepository } - ctxlogrus.Extract(stream.Context()).WithFields(log.Fields{ + ctxlogrus.Extract(ctx).WithFields(log.Fields{ "GlRepository": repository, "GitConfigOptions": req.GitConfigOptions, "GitProtocol": req.GitProtocol, @@ -38,17 +42,6 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e return helper.ErrInvalidArgument(err) } - if err = s.sshUploadPack(stream, req); err != nil { - return helper.ErrInternal(err) - } - - return nil -} - -func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, req *gitalypb.SSHUploadPackRequest) error { - ctx, cancelCtx := context.WithCancel(stream.Context()) - defer cancelCtx() - stdin := streamio.NewReader(func() ([]byte, error) { request, err := stream.Recv() return request.GetStdin(), err @@ -57,29 +50,51 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r // gRPC doesn't allow concurrent writes to a stream, so we need to // synchronize writing stdout and stderrr. var m sync.Mutex - - stdoutCounter := &helper.CountingWriter{ - W: streamio.NewSyncWriter(&m, func(p []byte) error { - return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p}) - }), - } - // Use large copy buffer to reduce the number of system calls - stdout := &largeBufferReaderFrom{Writer: stdoutCounter} - + stdout := streamio.NewSyncWriter(&m, func(p []byte) error { + return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p}) + }) stderr := streamio.NewSyncWriter(&m, func(p []byte) error { return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p}) }) - repoPath, err := s.locator.GetRepoPath(req.Repository) + if status, err := s.sshUploadPack(ctx, req, stdin, stdout, stderr); err != nil { + if errSend := stream.Send(&gitalypb.SSHUploadPackResponse{ + ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, + }); errSend != nil { + ctxlogrus.Extract(ctx).WithError(errSend).Error("send final status code") + } + + return helper.ErrInternal(err) + } + + return nil +} + +type sshUploadPackRequest interface { + GetRepository() *gitalypb.Repository + GetGitConfigOptions() []string + GetGitProtocol() string +} + +func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) { + ctx, cancelCtx := context.WithCancel(ctx) + defer cancelCtx() + + stdoutCounter := &helper.CountingWriter{W: stdout} + // Use large copy buffer to reduce the number of system calls + stdout = &largeBufferReaderFrom{Writer: stdoutCounter} + + repo := req.GetRepository() + repoPath, err := s.locator.GetRepoPath(repo) if err != nil { - return err + return 0, err } - git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().StorageName, repoPath) + git.WarnIfTooManyBitmaps(ctx, s.locator, repo.StorageName, repoPath) - config, err := git.ConvertConfigOptions(req.GitConfigOptions) + config, err := git.ConvertConfigOptions(req.GetGitConfigOptions()) if err != nil { - return err + return 0, err } pr, pw := io.Pipe() @@ -96,7 +111,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r stats, err := stats.ParsePackfileNegotiation(pr) if err != nil { - ctxlogrus.Extract(stream.Context()).WithError(err).Debug("failed parsing packfile negotiation") + ctxlogrus.Extract(ctx).WithError(err).Debug("failed parsing packfile negotiation") return } stats.UpdateMetrics(s.packfileNegotiationMetrics) @@ -105,7 +120,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r commandOpts := []git.CmdOpt{ git.WithGitProtocol(req), git.WithConfig(config...), - git.WithPackObjectsHookEnv(req.Repository), + git.WithPackObjectsHookEnv(repo), } cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, stdin, stdout, stderr, git.SubCmd{ @@ -113,7 +128,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r Args: []string{repoPath}, }, commandOpts...) if err != nil { - return err + return 0, err } timeoutTicker := helper.NewTimerTicker(s.uploadPackRequestTimeout) @@ -130,15 +145,8 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r pw.Close() wg.Wait() - if status, ok := command.ExitStatus(err); ok { - if sendErr := stream.Send(&gitalypb.SSHUploadPackResponse{ - ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, - }); sendErr != nil { - return sendErr - } - return fmt.Errorf("SSHUploadPack: %v", err) - } - return fmt.Errorf("cmd wait: %v", err) + status, _ := command.ExitStatus(err) + return status, fmt.Errorf("cmd wait: %w", err) } pw.Close() @@ -146,7 +154,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r ctxlogrus.Extract(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details") - return nil + return 0, nil } func validateFirstUploadPackRequest(req *gitalypb.SSHUploadPackRequest) error { @@ -164,3 +172,23 @@ type largeBufferReaderFrom struct { func (rf *largeBufferReaderFrom) ReadFrom(r io.Reader) (int64, error) { return io.CopyBuffer(rf.Writer, r, make([]byte, 64*1024)) } + +func (s *server) SSHUploadPackWithSidechannel(ctx context.Context, req *gitalypb.SSHUploadPackWithSidechannelRequest) (*gitalypb.SSHUploadPackWithSidechannelResponse, error) { + conn, err := sidechannel.OpenSidechannel(ctx) + if err != nil { + return nil, helper.ErrUnavailable(err) + } + defer conn.Close() + + sidebandWriter := pktline.NewSidebandWriter(conn) + stdout := sidebandWriter.Writer(stream.BandStdout) + stderr := sidebandWriter.Writer(stream.BandStderr) + if _, err := s.sshUploadPack(ctx, req, conn, stdout, stderr); err != nil { + return nil, helper.ErrInternal(err) + } + if err := conn.Close(); err != nil { + return nil, helper.ErrInternalf("close sidechannel: %w", err) + } + + return &gitalypb.SSHUploadPackWithSidechannelResponse{}, nil +} diff --git a/internal/gitaly/service/ssh/upload_pack_test.go b/internal/gitaly/service/ssh/upload_pack_test.go index 5afa391da..97fd079d1 100644 --- a/internal/gitaly/service/ssh/upload_pack_test.go +++ b/internal/gitaly/service/ssh/upload_pack_test.go @@ -33,6 +33,7 @@ type cloneCommand struct { gitConfig string gitProtocol string cfg config.Cfg + sidechannel bool } func runTestWithAndWithoutConfigOptions(t *testing.T, tf func(t *testing.T, opts ...testcfg.Option), opts ...testcfg.Option) { @@ -64,15 +65,20 @@ func (cmd cloneCommand) execute(t *testing.T) error { ctx, cancel := testhelper.Context() defer cancel() + env := []string{ + fmt.Sprintf("GITALY_ADDRESS=%s", cmd.server), + fmt.Sprintf("GITALY_PAYLOAD=%s", payload), + fmt.Sprintf("GITALY_FEATUREFLAGS=%s", strings.Join(flagPairs, ",")), + fmt.Sprintf("PATH=.:%s", os.Getenv("PATH")), + fmt.Sprintf(`GIT_SSH_COMMAND=%s upload-pack`, filepath.Join(cmd.cfg.BinDir, "gitaly-ssh")), + } + if cmd.sidechannel { + env = append(env, "GITALY_USE_SIDECHANNEL=1") + } + var output bytes.Buffer gitCommand, err := gittest.NewCommandFactory(t, cmd.cfg).NewWithoutRepo(ctx, - cmd.command, git.WithStdout(&output), git.WithStderr(&output), git.WithEnv( - fmt.Sprintf("GITALY_ADDRESS=%s", cmd.server), - fmt.Sprintf("GITALY_PAYLOAD=%s", payload), - fmt.Sprintf("GITALY_FEATUREFLAGS=%s", strings.Join(flagPairs, ",")), - fmt.Sprintf("PATH=.:%s", os.Getenv("PATH")), - fmt.Sprintf(`GIT_SSH_COMMAND=%s upload-pack`, filepath.Join(cmd.cfg.BinDir, "gitaly-ssh")), - ), git.WithDisabledHooks(), + cmd.command, git.WithStdout(&output), git.WithStderr(&output), git.WithEnv(env...), git.WithDisabledHooks(), ) require.NoError(t, err) @@ -221,6 +227,20 @@ func TestUploadPackCloneSuccess(t *testing.T) { } func testUploadPackCloneSuccess(t *testing.T, opts ...testcfg.Option) { + testUploadPackCloneSuccess2(t, false, opts...) +} + +func TestUploadPackWithSidechannelCloneSuccess(t *testing.T) { + t.Parallel() + + runTestWithAndWithoutConfigOptions(t, testUploadPackWithSidechannelCloneSuccess, testcfg.WithPackObjectsCacheEnabled()) +} + +func testUploadPackWithSidechannelCloneSuccess(t *testing.T, opts ...testcfg.Option) { + testUploadPackCloneSuccess2(t, true, opts...) +} + +func testUploadPackCloneSuccess2(t *testing.T, sidechannel bool, opts ...testcfg.Option) { cfg, repo, repoPath := testcfg.BuildWithRepo(t, opts...) testcfg.BuildGitalyHooks(t, cfg) @@ -263,10 +283,11 @@ func testUploadPackCloneSuccess(t *testing.T, opts ...testcfg.Option) { negotiationMetrics.Reset() cmd := cloneCommand{ - repository: repo, - command: tc.cmd, - server: serverSocketPath, - cfg: cfg, + repository: repo, + command: tc.cmd, + server: serverSocketPath, + cfg: cfg, + sidechannel: sidechannel, } lHead, rHead, _, _ := cmd.test(t, cfg, repoPath, localRepoPath) require.Equal(t, lHead, rHead, "local and remote head not equal") diff --git a/internal/sidechannel/proxy.go b/internal/sidechannel/proxy.go index 2302d44eb..db184ff88 100644 --- a/internal/sidechannel/proxy.go +++ b/internal/sidechannel/proxy.go @@ -84,43 +84,54 @@ func proxy(ctx context.Context) func(*ClientConn) error { } defer downstream.Close() - const nStreams = 2 - errC := make(chan error, nStreams) - + fromDownstream := make(chan error, 1) go func() { - errC <- func() error { + fromDownstream <- func() error { if _, err := io.Copy(upstream, downstream); err != nil { - return err + return fmt.Errorf("copy to upstream: %w", err) + } + + if err := upstream.CloseWrite(); err != nil { + return fmt.Errorf("closewrite upstream: %w", err) } - // Downstream.Read() has returned EOF. That means we are done proxying - // the request body from downstream to upstream. Propagate this EOF to - // upstream by calling CloseWrite(). Use CloseWrite(), not Close(), - // because we still want to read the response body from upstream in the - // other goroutine. - return upstream.CloseWrite() + return nil }() }() + fromUpstream := make(chan error, 1) go func() { - errC <- func() error { + fromUpstream <- func() error { if _, err := io.Copy(downstream, upstream); err != nil { - return err + return fmt.Errorf("copy to downstream: %w", err) } - // Upstream is now closed for both reads and writes. Propagate this state - // to downstream. This also happens via defer, but this way we can log - // the Close error if there is one. - return downstream.Close() + return nil }() }() - for i := 0; i < nStreams; i++ { - if err := <-errC; err != nil { - return err + waitForUpstream: + for { + select { + case err := <-fromUpstream: + if err != nil { + return err + } + + break waitForUpstream + case err := <-fromDownstream: + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() } } + if err := downstream.Close(); err != nil { + return fmt.Errorf("close downstream: %w", err) + } + return nil } } diff --git a/internal/sidechannel/proxy_test.go b/internal/sidechannel/proxy_test.go index 1501a02d3..103ecfd63 100644 --- a/internal/sidechannel/proxy_test.go +++ b/internal/sidechannel/proxy_test.go @@ -19,17 +19,25 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -func testProxyServer(ctx context.Context) (err error) { +func testProxyServer(ctx context.Context, expectEOF bool) (err error) { conn, err := OpenSidechannel(ctx) if err != nil { return err } defer conn.Close() - buf, err := io.ReadAll(conn) + var buf []byte + if expectEOF { + buf, err = io.ReadAll(conn) + } else { + buf = make([]byte, 5) + _, err = conn.Read(buf) + } + if err != nil { return fmt.Errorf("server read: %w", err) } + if string(buf) != "hello" { return fmt.Errorf("server: unexpected request: %q", buf) } @@ -44,30 +52,38 @@ func testProxyServer(ctx context.Context) (err error) { return nil } -func testProxyClient(conn *ClientConn) (err error) { - if _, err := io.WriteString(conn, "hello"); err != nil { - return fmt.Errorf("client write: %w", err) - } - if err := conn.CloseWrite(); err != nil { - return err - } +func testProxyClient(closeWrite bool) func(*ClientConn) error { + return func(conn *ClientConn) (err error) { + if _, err := io.WriteString(conn, "hello"); err != nil { + return fmt.Errorf("client write: %w", err) + } + if closeWrite { + if err := conn.CloseWrite(); err != nil { + return err + } + } - buf, err := io.ReadAll(conn) - if err != nil { - return fmt.Errorf("client read: %w", err) - } - if string(buf) != "world" { - return fmt.Errorf("client: unexpected response: %q", buf) - } + buf, err := io.ReadAll(conn) + if err != nil { + return fmt.Errorf("client read: %w", err) + } + if string(buf) != "world" { + return fmt.Errorf("client: unexpected response: %q", buf) + } - return nil + return nil + } } -func TestUnaryProxy(t *testing.T) { +func TestUnaryProxy(t *testing.T) { testUnaryProxy(t, true) } + +func TestUnaryProxy_withoutCloseWrite(t *testing.T) { testUnaryProxy(t, false) } + +func testUnaryProxy(t *testing.T, closeWrite bool) { upstreamAddr := startServer( t, func(ctx context.Context, request *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { - if err := testProxyServer(ctx); err != nil { + if err := testProxyServer(ctx, closeWrite); err != nil { return nil, err } return &healthpb.HealthCheckResponse{}, nil @@ -92,7 +108,7 @@ func TestUnaryProxy(t *testing.T) { defer cancel() conn, registry := dial(t, proxyAddr) - require.NoError(t, call(ctx, conn, registry, testProxyClient)) + require.NoError(t, call(ctx, conn, registry, testProxyClient(closeWrite))) } func newLogger() *logrus.Entry { return logrus.NewEntry(logrus.New()) } @@ -115,11 +131,15 @@ func dialProxy(upstreamAddr string) (*grpc.ClientConn, error) { return grpc.Dial(upstreamAddr, dialOpts...) } -func TestStreamProxy(t *testing.T) { +func TestStreamProxy(t *testing.T) { testStreamProxy(t, true) } + +func TestStreamProxy_noCloseWrite(t *testing.T) { testStreamProxy(t, false) } + +func testStreamProxy(t *testing.T, closeWrite bool) { upstreamAddr := startStreamServer( t, func(stream gitalypb.SSHService_SSHUploadPackServer) error { - return testProxyServer(stream.Context()) + return testProxyServer(stream.Context(), closeWrite) }, ) @@ -150,7 +170,7 @@ func TestStreamProxy(t *testing.T) { defer cancel() conn, registry := dial(t, proxyAddr) - ctx, waiter := RegisterSidechannel(ctx, registry, testProxyClient) + ctx, waiter := RegisterSidechannel(ctx, registry, testProxyClient(closeWrite)) defer waiter.Close() client, err := gitalypb.NewSSHServiceClient(conn).SSHUploadPack(ctx) diff --git a/internal/stream/pktline.go b/internal/stream/pktline.go new file mode 100644 index 000000000..d83eb3a45 --- /dev/null +++ b/internal/stream/pktline.go @@ -0,0 +1,74 @@ +package stream + +import ( + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git/pktline" +) + +// Conn represents a bi-directional client side connection. +type Conn interface { + io.Reader + io.Writer + CloseWrite() error +} + +// BandStdout and BandStderr are the pktline band ID's for stdout and +// stderr, respectively. +const ( + BandStdout = 1 + BandStderr = 2 +) + +// ProxyPktLine lets a client de-multiplex and proxy stdin, stdout and +// stderr streams. Stdout and stderr are interleaved with Git's pktline +// format. +func ProxyPktLine(c Conn, stdin io.Reader, stdout, stderr io.Writer) error { + copyRequest := func(c Conn) error { + if _, err := io.Copy(c, stdin); err != nil { + return fmt.Errorf("copy request: %w", err) + } + if err := c.CloseWrite(); err != nil { + return fmt.Errorf("close request: %w", err) + } + return nil + } + + copyResponse := func(c Conn) error { + if err := pktline.EachSidebandPacket(c, func(band byte, data []byte) (err error) { + switch band { + case BandStdout: + _, err = stdout.Write(data) + case BandStderr: + _, err = stderr.Write(data) + default: + err = fmt.Errorf("unexpected band: %d", band) + } + return err + }); err != nil { + return fmt.Errorf("copy response: %w", err) + } + + return nil + } + + request := make(chan error, 1) + go func() { request <- copyRequest(c) }() + + response := make(chan error, 1) + go func() { response <- copyResponse(c) }() + + for { + select { + case err := <-response: + // Server is done. No point in waiting for client. + return err + case err := <-request: + if err != nil { + return err + } + // Client is now done. Wait for server to finish too. + } + } +} diff --git a/proto/go/gitalypb/ssh.pb.go b/proto/go/gitalypb/ssh.pb.go index c524ea2eb..32a62d738 100644 --- a/proto/go/gitalypb/ssh.pb.go +++ b/proto/go/gitalypb/ssh.pb.go @@ -162,6 +162,110 @@ func (x *SSHUploadPackResponse) GetExitStatus() *ExitStatus { return nil } +type SSHUploadPackWithSidechannelRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // 'repository' must be present in the first message. + Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` + // Parameters to use with git -c (key=value pairs) + GitConfigOptions []string `protobuf:"bytes,2,rep,name=git_config_options,json=gitConfigOptions,proto3" json:"git_config_options,omitempty"` + // Git protocol version + GitProtocol string `protobuf:"bytes,3,opt,name=git_protocol,json=gitProtocol,proto3" json:"git_protocol,omitempty"` +} + +func (x *SSHUploadPackWithSidechannelRequest) Reset() { + *x = SSHUploadPackWithSidechannelRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_ssh_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SSHUploadPackWithSidechannelRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SSHUploadPackWithSidechannelRequest) ProtoMessage() {} + +func (x *SSHUploadPackWithSidechannelRequest) ProtoReflect() protoreflect.Message { + mi := &file_ssh_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SSHUploadPackWithSidechannelRequest.ProtoReflect.Descriptor instead. +func (*SSHUploadPackWithSidechannelRequest) Descriptor() ([]byte, []int) { + return file_ssh_proto_rawDescGZIP(), []int{2} +} + +func (x *SSHUploadPackWithSidechannelRequest) GetRepository() *Repository { + if x != nil { + return x.Repository + } + return nil +} + +func (x *SSHUploadPackWithSidechannelRequest) GetGitConfigOptions() []string { + if x != nil { + return x.GitConfigOptions + } + return nil +} + +func (x *SSHUploadPackWithSidechannelRequest) GetGitProtocol() string { + if x != nil { + return x.GitProtocol + } + return "" +} + +type SSHUploadPackWithSidechannelResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SSHUploadPackWithSidechannelResponse) Reset() { + *x = SSHUploadPackWithSidechannelResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_ssh_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SSHUploadPackWithSidechannelResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SSHUploadPackWithSidechannelResponse) ProtoMessage() {} + +func (x *SSHUploadPackWithSidechannelResponse) ProtoReflect() protoreflect.Message { + mi := &file_ssh_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SSHUploadPackWithSidechannelResponse.ProtoReflect.Descriptor instead. +func (*SSHUploadPackWithSidechannelResponse) Descriptor() ([]byte, []int) { + return file_ssh_proto_rawDescGZIP(), []int{3} +} + type SSHReceivePackRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -185,7 +289,7 @@ type SSHReceivePackRequest struct { func (x *SSHReceivePackRequest) Reset() { *x = SSHReceivePackRequest{} if protoimpl.UnsafeEnabled { - mi := &file_ssh_proto_msgTypes[2] + mi := &file_ssh_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -198,7 +302,7 @@ func (x *SSHReceivePackRequest) String() string { func (*SSHReceivePackRequest) ProtoMessage() {} func (x *SSHReceivePackRequest) ProtoReflect() protoreflect.Message { - mi := &file_ssh_proto_msgTypes[2] + mi := &file_ssh_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -211,7 +315,7 @@ func (x *SSHReceivePackRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SSHReceivePackRequest.ProtoReflect.Descriptor instead. func (*SSHReceivePackRequest) Descriptor() ([]byte, []int) { - return file_ssh_proto_rawDescGZIP(), []int{2} + return file_ssh_proto_rawDescGZIP(), []int{4} } func (x *SSHReceivePackRequest) GetRepository() *Repository { @@ -280,7 +384,7 @@ type SSHReceivePackResponse struct { func (x *SSHReceivePackResponse) Reset() { *x = SSHReceivePackResponse{} if protoimpl.UnsafeEnabled { - mi := &file_ssh_proto_msgTypes[3] + mi := &file_ssh_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -293,7 +397,7 @@ func (x *SSHReceivePackResponse) String() string { func (*SSHReceivePackResponse) ProtoMessage() {} func (x *SSHReceivePackResponse) ProtoReflect() protoreflect.Message { - mi := &file_ssh_proto_msgTypes[3] + mi := &file_ssh_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -306,7 +410,7 @@ func (x *SSHReceivePackResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SSHReceivePackResponse.ProtoReflect.Descriptor instead. func (*SSHReceivePackResponse) Descriptor() ([]byte, []int) { - return file_ssh_proto_rawDescGZIP(), []int{3} + return file_ssh_proto_rawDescGZIP(), []int{5} } func (x *SSHReceivePackResponse) GetStdout() []byte { @@ -344,7 +448,7 @@ type SSHUploadArchiveRequest struct { func (x *SSHUploadArchiveRequest) Reset() { *x = SSHUploadArchiveRequest{} if protoimpl.UnsafeEnabled { - mi := &file_ssh_proto_msgTypes[4] + mi := &file_ssh_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -357,7 +461,7 @@ func (x *SSHUploadArchiveRequest) String() string { func (*SSHUploadArchiveRequest) ProtoMessage() {} func (x *SSHUploadArchiveRequest) ProtoReflect() protoreflect.Message { - mi := &file_ssh_proto_msgTypes[4] + mi := &file_ssh_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -370,7 +474,7 @@ func (x *SSHUploadArchiveRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use SSHUploadArchiveRequest.ProtoReflect.Descriptor instead. func (*SSHUploadArchiveRequest) Descriptor() ([]byte, []int) { - return file_ssh_proto_rawDescGZIP(), []int{4} + return file_ssh_proto_rawDescGZIP(), []int{6} } func (x *SSHUploadArchiveRequest) GetRepository() *Repository { @@ -403,7 +507,7 @@ type SSHUploadArchiveResponse struct { func (x *SSHUploadArchiveResponse) Reset() { *x = SSHUploadArchiveResponse{} if protoimpl.UnsafeEnabled { - mi := &file_ssh_proto_msgTypes[5] + mi := &file_ssh_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -416,7 +520,7 @@ func (x *SSHUploadArchiveResponse) String() string { func (*SSHUploadArchiveResponse) ProtoMessage() {} func (x *SSHUploadArchiveResponse) ProtoReflect() protoreflect.Message { - mi := &file_ssh_proto_msgTypes[5] + mi := &file_ssh_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -429,7 +533,7 @@ func (x *SSHUploadArchiveResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SSHUploadArchiveResponse.ProtoReflect.Descriptor instead. func (*SSHUploadArchiveResponse) Descriptor() ([]byte, []int) { - return file_ssh_proto_rawDescGZIP(), []int{5} + return file_ssh_proto_rawDescGZIP(), []int{7} } func (x *SSHUploadArchiveResponse) GetStdout() []byte { @@ -480,69 +584,91 @@ var file_ssh_proto_rawDesc = []byte{ 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, - 0x75, 0x73, 0x22, 0x93, 0x02, 0x0a, 0x15, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, - 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, - 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, - 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x13, 0x0a, 0x05, - 0x67, 0x6c, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x67, 0x6c, 0x49, - 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x67, 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, - 0x72, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x67, 0x6c, 0x52, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x6c, 0x5f, 0x75, 0x73, 0x65, - 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x6c, 0x55, - 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x69, 0x74, 0x5f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x67, - 0x69, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x2c, 0x0a, 0x12, 0x67, 0x69, - 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x18, 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x67, 0x69, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x53, 0x53, 0x48, 0x52, - 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, - 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, - 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x69, 0x0a, 0x17, 0x53, 0x53, 0x48, 0x55, 0x70, - 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, - 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, - 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, - 0x69, 0x6e, 0x22, 0x7f, 0x0a, 0x18, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, - 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, - 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, - 0x0a, 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, - 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x32, 0xa6, 0x02, 0x0a, 0x0a, 0x53, 0x53, 0x48, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x58, 0x0a, 0x0d, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, - 0x61, 0x63, 0x6b, 0x12, 0x1c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, - 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, - 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x0e, - 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x12, 0x1d, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, - 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, + 0x75, 0x73, 0x22, 0xb0, 0x01, 0x0a, 0x23, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, + 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68, 0x53, 0x69, 0x64, 0x65, 0x63, 0x68, 0x61, 0x6e, + 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, + 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, + 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x12, 0x2c, 0x0a, 0x12, 0x67, 0x69, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x10, 0x67, 0x69, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4f, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x67, 0x69, 0x74, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x22, 0x26, 0x0a, 0x24, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, + 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68, 0x53, 0x69, 0x64, 0x65, 0x63, 0x68, + 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x93, 0x02, + 0x0a, 0x15, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, + 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, + 0x79, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x12, 0x13, 0x0a, 0x05, 0x67, 0x6c, 0x5f, 0x69, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x67, 0x6c, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, + 0x67, 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0c, 0x67, 0x6c, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, + 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x67, 0x6c, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x67, 0x6c, 0x55, 0x73, 0x65, 0x72, 0x6e, 0x61, + 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x67, 0x69, 0x74, 0x5f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x67, 0x69, 0x74, 0x50, 0x72, 0x6f, + 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x2c, 0x0a, 0x12, 0x67, 0x69, 0x74, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x10, 0x67, 0x69, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x4f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x22, 0x7d, 0x0a, 0x16, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, + 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, + 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, + 0x0b, 0x65, 0x78, 0x69, 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x22, 0x69, 0x0a, 0x17, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, + 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, + 0x0a, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, + 0x69, 0x74, 0x6f, 0x72, 0x79, 0x42, 0x04, 0x98, 0xc6, 0x2c, 0x01, 0x52, 0x0a, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, 0x64, 0x69, 0x6e, 0x22, 0x7f, 0x0a, + 0x18, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, + 0x6f, 0x75, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, + 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x65, 0x78, 0x69, + 0x74, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x45, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x32, 0xaa, + 0x03, 0x0a, 0x0a, 0x53, 0x53, 0x48, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x58, 0x0a, + 0x0d, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x12, 0x1c, + 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, + 0x64, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, + 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, + 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x12, 0x81, 0x01, 0x0a, 0x1c, 0x53, 0x53, 0x48, 0x55, + 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68, 0x53, 0x69, 0x64, + 0x65, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x2b, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57, + 0x69, 0x74, 0x68, 0x53, 0x69, 0x64, 0x65, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, + 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x50, 0x61, 0x63, 0x6b, 0x57, 0x69, 0x74, 0x68, + 0x53, 0x69, 0x64, 0x65, 0x63, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x12, 0x5b, 0x0a, 0x0e, 0x53, + 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, 0x50, 0x61, 0x63, 0x6b, 0x12, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, - 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, - 0x97, 0x28, 0x02, 0x08, 0x01, 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x10, 0x53, 0x53, 0x48, - 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x12, 0x1f, 0x2e, + 0x65, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x52, 0x65, 0x63, 0x65, 0x69, 0x76, 0x65, + 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, + 0x28, 0x02, 0x08, 0x01, 0x28, 0x01, 0x30, 0x01, 0x12, 0x61, 0x0a, 0x10, 0x53, 0x53, 0x48, 0x55, + 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x12, 0x1f, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x41, + 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, - 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x53, 0x48, 0x55, 0x70, 0x6c, 0x6f, 0x61, - 0x64, 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x42, 0x34, 0x5a, 0x32, - 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, - 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x41, 0x72, 0x63, 0x68, 0x69, 0x76, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x28, 0x01, 0x30, 0x01, 0x42, 0x34, 0x5a, 0x32, 0x67, + 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x34, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -557,35 +683,40 @@ func file_ssh_proto_rawDescGZIP() []byte { return file_ssh_proto_rawDescData } -var file_ssh_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_ssh_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_ssh_proto_goTypes = []interface{}{ - (*SSHUploadPackRequest)(nil), // 0: gitaly.SSHUploadPackRequest - (*SSHUploadPackResponse)(nil), // 1: gitaly.SSHUploadPackResponse - (*SSHReceivePackRequest)(nil), // 2: gitaly.SSHReceivePackRequest - (*SSHReceivePackResponse)(nil), // 3: gitaly.SSHReceivePackResponse - (*SSHUploadArchiveRequest)(nil), // 4: gitaly.SSHUploadArchiveRequest - (*SSHUploadArchiveResponse)(nil), // 5: gitaly.SSHUploadArchiveResponse - (*Repository)(nil), // 6: gitaly.Repository - (*ExitStatus)(nil), // 7: gitaly.ExitStatus + (*SSHUploadPackRequest)(nil), // 0: gitaly.SSHUploadPackRequest + (*SSHUploadPackResponse)(nil), // 1: gitaly.SSHUploadPackResponse + (*SSHUploadPackWithSidechannelRequest)(nil), // 2: gitaly.SSHUploadPackWithSidechannelRequest + (*SSHUploadPackWithSidechannelResponse)(nil), // 3: gitaly.SSHUploadPackWithSidechannelResponse + (*SSHReceivePackRequest)(nil), // 4: gitaly.SSHReceivePackRequest + (*SSHReceivePackResponse)(nil), // 5: gitaly.SSHReceivePackResponse + (*SSHUploadArchiveRequest)(nil), // 6: gitaly.SSHUploadArchiveRequest + (*SSHUploadArchiveResponse)(nil), // 7: gitaly.SSHUploadArchiveResponse + (*Repository)(nil), // 8: gitaly.Repository + (*ExitStatus)(nil), // 9: gitaly.ExitStatus } var file_ssh_proto_depIdxs = []int32{ - 6, // 0: gitaly.SSHUploadPackRequest.repository:type_name -> gitaly.Repository - 7, // 1: gitaly.SSHUploadPackResponse.exit_status:type_name -> gitaly.ExitStatus - 6, // 2: gitaly.SSHReceivePackRequest.repository:type_name -> gitaly.Repository - 7, // 3: gitaly.SSHReceivePackResponse.exit_status:type_name -> gitaly.ExitStatus - 6, // 4: gitaly.SSHUploadArchiveRequest.repository:type_name -> gitaly.Repository - 7, // 5: gitaly.SSHUploadArchiveResponse.exit_status:type_name -> gitaly.ExitStatus - 0, // 6: gitaly.SSHService.SSHUploadPack:input_type -> gitaly.SSHUploadPackRequest - 2, // 7: gitaly.SSHService.SSHReceivePack:input_type -> gitaly.SSHReceivePackRequest - 4, // 8: gitaly.SSHService.SSHUploadArchive:input_type -> gitaly.SSHUploadArchiveRequest - 1, // 9: gitaly.SSHService.SSHUploadPack:output_type -> gitaly.SSHUploadPackResponse - 3, // 10: gitaly.SSHService.SSHReceivePack:output_type -> gitaly.SSHReceivePackResponse - 5, // 11: gitaly.SSHService.SSHUploadArchive:output_type -> gitaly.SSHUploadArchiveResponse - 9, // [9:12] is the sub-list for method output_type - 6, // [6:9] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 8, // 0: gitaly.SSHUploadPackRequest.repository:type_name -> gitaly.Repository + 9, // 1: gitaly.SSHUploadPackResponse.exit_status:type_name -> gitaly.ExitStatus + 8, // 2: gitaly.SSHUploadPackWithSidechannelRequest.repository:type_name -> gitaly.Repository + 8, // 3: gitaly.SSHReceivePackRequest.repository:type_name -> gitaly.Repository + 9, // 4: gitaly.SSHReceivePackResponse.exit_status:type_name -> gitaly.ExitStatus + 8, // 5: gitaly.SSHUploadArchiveRequest.repository:type_name -> gitaly.Repository + 9, // 6: gitaly.SSHUploadArchiveResponse.exit_status:type_name -> gitaly.ExitStatus + 0, // 7: gitaly.SSHService.SSHUploadPack:input_type -> gitaly.SSHUploadPackRequest + 2, // 8: gitaly.SSHService.SSHUploadPackWithSidechannel:input_type -> gitaly.SSHUploadPackWithSidechannelRequest + 4, // 9: gitaly.SSHService.SSHReceivePack:input_type -> gitaly.SSHReceivePackRequest + 6, // 10: gitaly.SSHService.SSHUploadArchive:input_type -> gitaly.SSHUploadArchiveRequest + 1, // 11: gitaly.SSHService.SSHUploadPack:output_type -> gitaly.SSHUploadPackResponse + 3, // 12: gitaly.SSHService.SSHUploadPackWithSidechannel:output_type -> gitaly.SSHUploadPackWithSidechannelResponse + 5, // 13: gitaly.SSHService.SSHReceivePack:output_type -> gitaly.SSHReceivePackResponse + 7, // 14: gitaly.SSHService.SSHUploadArchive:output_type -> gitaly.SSHUploadArchiveResponse + 11, // [11:15] is the sub-list for method output_type + 7, // [7:11] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_ssh_proto_init() } @@ -621,7 +752,7 @@ func file_ssh_proto_init() { } } file_ssh_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SSHReceivePackRequest); i { + switch v := v.(*SSHUploadPackWithSidechannelRequest); i { case 0: return &v.state case 1: @@ -633,7 +764,7 @@ func file_ssh_proto_init() { } } file_ssh_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SSHReceivePackResponse); i { + switch v := v.(*SSHUploadPackWithSidechannelResponse); i { case 0: return &v.state case 1: @@ -645,7 +776,7 @@ func file_ssh_proto_init() { } } file_ssh_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*SSHUploadArchiveRequest); i { + switch v := v.(*SSHReceivePackRequest); i { case 0: return &v.state case 1: @@ -657,6 +788,30 @@ func file_ssh_proto_init() { } } file_ssh_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SSHReceivePackResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ssh_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SSHUploadArchiveRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_ssh_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*SSHUploadArchiveResponse); i { case 0: return &v.state @@ -675,7 +830,7 @@ func file_ssh_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_ssh_proto_rawDesc, NumEnums: 0, - NumMessages: 6, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/proto/go/gitalypb/ssh_grpc.pb.go b/proto/go/gitalypb/ssh_grpc.pb.go index 5d13f9896..21f761b8e 100644 --- a/proto/go/gitalypb/ssh_grpc.pb.go +++ b/proto/go/gitalypb/ssh_grpc.pb.go @@ -20,6 +20,8 @@ const _ = grpc.SupportPackageIsVersion7 type SSHServiceClient interface { // To forward 'git upload-pack' to Gitaly for SSH sessions SSHUploadPack(ctx context.Context, opts ...grpc.CallOption) (SSHService_SSHUploadPackClient, error) + // To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels + SSHUploadPackWithSidechannel(ctx context.Context, in *SSHUploadPackWithSidechannelRequest, opts ...grpc.CallOption) (*SSHUploadPackWithSidechannelResponse, error) // To forward 'git receive-pack' to Gitaly for SSH sessions SSHReceivePack(ctx context.Context, opts ...grpc.CallOption) (SSHService_SSHReceivePackClient, error) // To forward 'git upload-archive' to Gitaly for SSH sessions @@ -65,6 +67,15 @@ func (x *sSHServiceSSHUploadPackClient) Recv() (*SSHUploadPackResponse, error) { return m, nil } +func (c *sSHServiceClient) SSHUploadPackWithSidechannel(ctx context.Context, in *SSHUploadPackWithSidechannelRequest, opts ...grpc.CallOption) (*SSHUploadPackWithSidechannelResponse, error) { + out := new(SSHUploadPackWithSidechannelResponse) + err := c.cc.Invoke(ctx, "/gitaly.SSHService/SSHUploadPackWithSidechannel", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *sSHServiceClient) SSHReceivePack(ctx context.Context, opts ...grpc.CallOption) (SSHService_SSHReceivePackClient, error) { stream, err := c.cc.NewStream(ctx, &SSHService_ServiceDesc.Streams[1], "/gitaly.SSHService/SSHReceivePack", opts...) if err != nil { @@ -133,6 +144,8 @@ func (x *sSHServiceSSHUploadArchiveClient) Recv() (*SSHUploadArchiveResponse, er type SSHServiceServer interface { // To forward 'git upload-pack' to Gitaly for SSH sessions SSHUploadPack(SSHService_SSHUploadPackServer) error + // To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels + SSHUploadPackWithSidechannel(context.Context, *SSHUploadPackWithSidechannelRequest) (*SSHUploadPackWithSidechannelResponse, error) // To forward 'git receive-pack' to Gitaly for SSH sessions SSHReceivePack(SSHService_SSHReceivePackServer) error // To forward 'git upload-archive' to Gitaly for SSH sessions @@ -147,6 +160,9 @@ type UnimplementedSSHServiceServer struct { func (UnimplementedSSHServiceServer) SSHUploadPack(SSHService_SSHUploadPackServer) error { return status.Errorf(codes.Unimplemented, "method SSHUploadPack not implemented") } +func (UnimplementedSSHServiceServer) SSHUploadPackWithSidechannel(context.Context, *SSHUploadPackWithSidechannelRequest) (*SSHUploadPackWithSidechannelResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SSHUploadPackWithSidechannel not implemented") +} func (UnimplementedSSHServiceServer) SSHReceivePack(SSHService_SSHReceivePackServer) error { return status.Errorf(codes.Unimplemented, "method SSHReceivePack not implemented") } @@ -192,6 +208,24 @@ func (x *sSHServiceSSHUploadPackServer) Recv() (*SSHUploadPackRequest, error) { return m, nil } +func _SSHService_SSHUploadPackWithSidechannel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SSHUploadPackWithSidechannelRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SSHServiceServer).SSHUploadPackWithSidechannel(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/gitaly.SSHService/SSHUploadPackWithSidechannel", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SSHServiceServer).SSHUploadPackWithSidechannel(ctx, req.(*SSHUploadPackWithSidechannelRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _SSHService_SSHReceivePack_Handler(srv interface{}, stream grpc.ServerStream) error { return srv.(SSHServiceServer).SSHReceivePack(&sSHServiceSSHReceivePackServer{stream}) } @@ -250,7 +284,12 @@ func (x *sSHServiceSSHUploadArchiveServer) Recv() (*SSHUploadArchiveRequest, err var SSHService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.SSHService", HandlerType: (*SSHServiceServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "SSHUploadPackWithSidechannel", + Handler: _SSHService_SSHUploadPackWithSidechannel_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "SSHUploadPack", diff --git a/proto/ssh.proto b/proto/ssh.proto index 4545effbf..732d26a2c 100644 --- a/proto/ssh.proto +++ b/proto/ssh.proto @@ -15,6 +15,13 @@ service SSHService { }; } + // To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels + rpc SSHUploadPackWithSidechannel(SSHUploadPackWithSidechannelRequest) returns (SSHUploadPackWithSidechannelResponse) { + option (op_type) = { + op: ACCESSOR + }; + } + // To forward 'git receive-pack' to Gitaly for SSH sessions rpc SSHReceivePack(stream SSHReceivePackRequest) returns (stream SSHReceivePackResponse) { option (op_type) = { @@ -55,6 +62,18 @@ message SSHUploadPackResponse { ExitStatus exit_status = 3; } +message SSHUploadPackWithSidechannelRequest { + // 'repository' must be present in the first message. + Repository repository = 1 [(target_repository)=true]; + // Parameters to use with git -c (key=value pairs) + repeated string git_config_options = 2; + + // Git protocol version + string git_protocol = 3; +} + +message SSHUploadPackWithSidechannelResponse {} + message SSHReceivePackRequest { // 'repository' must be present in the first message. Repository repository = 1 [(target_repository)=true]; diff --git a/ruby/proto/gitaly/ssh_pb.rb b/ruby/proto/gitaly/ssh_pb.rb index fdde47b80..49e65cc14 100644 --- a/ruby/proto/gitaly/ssh_pb.rb +++ b/ruby/proto/gitaly/ssh_pb.rb @@ -18,6 +18,13 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :stderr, :bytes, 2 optional :exit_status, :message, 3, "gitaly.ExitStatus" end + add_message "gitaly.SSHUploadPackWithSidechannelRequest" do + optional :repository, :message, 1, "gitaly.Repository" + repeated :git_config_options, :string, 2 + optional :git_protocol, :string, 3 + end + add_message "gitaly.SSHUploadPackWithSidechannelResponse" do + end add_message "gitaly.SSHReceivePackRequest" do optional :repository, :message, 1, "gitaly.Repository" optional :stdin, :bytes, 2 @@ -47,6 +54,8 @@ end module Gitaly SSHUploadPackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackRequest").msgclass SSHUploadPackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackResponse").msgclass + SSHUploadPackWithSidechannelRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackWithSidechannelRequest").msgclass + SSHUploadPackWithSidechannelResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadPackWithSidechannelResponse").msgclass SSHReceivePackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHReceivePackRequest").msgclass SSHReceivePackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHReceivePackResponse").msgclass SSHUploadArchiveRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.SSHUploadArchiveRequest").msgclass diff --git a/ruby/proto/gitaly/ssh_services_pb.rb b/ruby/proto/gitaly/ssh_services_pb.rb index ed4527087..a50b0fa5a 100644 --- a/ruby/proto/gitaly/ssh_services_pb.rb +++ b/ruby/proto/gitaly/ssh_services_pb.rb @@ -16,6 +16,8 @@ module Gitaly # To forward 'git upload-pack' to Gitaly for SSH sessions rpc :SSHUploadPack, stream(::Gitaly::SSHUploadPackRequest), stream(::Gitaly::SSHUploadPackResponse) + # To forward 'git upload-pack' to Gitaly for SSH sessions, via sidechannels + rpc :SSHUploadPackWithSidechannel, ::Gitaly::SSHUploadPackWithSidechannelRequest, ::Gitaly::SSHUploadPackWithSidechannelResponse # To forward 'git receive-pack' to Gitaly for SSH sessions rpc :SSHReceivePack, stream(::Gitaly::SSHReceivePackRequest), stream(::Gitaly::SSHReceivePackResponse) # To forward 'git upload-archive' to Gitaly for SSH sessions diff --git a/streamio/stream.go b/streamio/stream.go index 0aab25315..68e6bd687 100644 --- a/streamio/stream.go +++ b/streamio/stream.go @@ -23,7 +23,7 @@ type receiveReader struct { } func (rr *receiveReader) Read(p []byte) (int, error) { - if len(rr.data) == 0 { + if len(rr.data) == 0 && rr.err == nil { rr.data, rr.err = rr.receiver() } diff --git a/streamio/stream_test.go b/streamio/stream_test.go index c91d5a58f..8f7c841fd 100644 --- a/streamio/stream_test.go +++ b/streamio/stream_test.go @@ -2,6 +2,7 @@ package streamio import ( "bytes" + "errors" "fmt" "io" "strings" @@ -60,6 +61,25 @@ func TestReadSizes(t *testing.T) { }) } +func TestRead_rememberError(t *testing.T) { + firstRead := true + myError := errors.New("hello world") + r := NewReader(func() ([]byte, error) { + if firstRead { + firstRead = false + return nil, myError + } + panic("should never be reached") + }) + + // Intentionally call Read more than once. We want the error to be + // sticky. + for i := 0; i < 10; i++ { + _, err := r.Read(nil) + require.Equal(t, err, myError) + } +} + func receiverFromReader(r io.Reader) func() ([]byte, error) { return func() ([]byte, error) { data := make([]byte, 10) |