diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2022-01-11 20:11:05 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2022-01-17 16:12:05 +0300 |
commit | abdd6cffc874e6578072b20fb0c7991b0b725c56 (patch) | |
tree | a0526cb22c110a864bc4197c4125ab8644dbd8a7 | |
parent | 7ce0d18ad44686865aa0dbf5f1b47d9cc05988be (diff) |
SSHUploadPack: decouple implementation from gRPC
This separates SSHUploadPack into an outer layer that works with the
gRPC stream, and a core function that works with io.Reader, io.Writer
etc. This is preparation for SSHUploadPackWithSidechannel.
-rw-r--r-- | internal/gitaly/service/ssh/upload_pack.go | 84 |
1 files changed, 45 insertions, 39 deletions
diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go index 21206a233..a3bed9e9b 100644 --- a/internal/gitaly/service/ssh/upload_pack.go +++ b/internal/gitaly/service/ssh/upload_pack.go @@ -18,6 +18,8 @@ import ( ) func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) error { + ctx := stream.Context() + req, err := stream.Recv() // First request contains Repository only if err != nil { return helper.ErrInternal(err) @@ -28,7 +30,7 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e repository = req.Repository.GlRepository } - ctxlogrus.Extract(stream.Context()).WithFields(log.Fields{ + ctxlogrus.Extract(ctx).WithFields(log.Fields{ "GlRepository": repository, "GitConfigOptions": req.GitConfigOptions, "GitProtocol": req.GitProtocol, @@ -38,17 +40,6 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e return helper.ErrInvalidArgument(err) } - if err = s.sshUploadPack(stream, req); err != nil { - return helper.ErrInternal(err) - } - - return nil -} - -func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, req *gitalypb.SSHUploadPackRequest) error { - ctx, cancelCtx := context.WithCancel(stream.Context()) - defer cancelCtx() - stdin := streamio.NewReader(func() ([]byte, error) { request, err := stream.Recv() return request.GetStdin(), err @@ -57,29 +48,51 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r // gRPC doesn't allow concurrent writes to a stream, so we need to // synchronize writing stdout and stderrr. var m sync.Mutex - - stdoutCounter := &helper.CountingWriter{ - W: streamio.NewSyncWriter(&m, func(p []byte) error { - return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p}) - }), - } - // Use large copy buffer to reduce the number of system calls - stdout := &largeBufferReaderFrom{Writer: stdoutCounter} - + stdout := streamio.NewSyncWriter(&m, func(p []byte) error { + return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p}) + }) stderr := streamio.NewSyncWriter(&m, func(p []byte) error { return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p}) }) - repoPath, err := s.locator.GetRepoPath(req.Repository) + if status, err := s.sshUploadPack(ctx, req, stdin, stdout, stderr); err != nil { + if errSend := stream.Send(&gitalypb.SSHUploadPackResponse{ + ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, + }); errSend != nil { + ctxlogrus.Extract(ctx).WithError(errSend).Error("send final status code") + } + + return helper.ErrInternal(err) + } + + return nil +} + +type sshUploadPackRequest interface { + GetRepository() *gitalypb.Repository + GetGitConfigOptions() []string + GetGitProtocol() string +} + +func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) { + ctx, cancelCtx := context.WithCancel(ctx) + defer cancelCtx() + + stdoutCounter := &helper.CountingWriter{W: stdout} + // Use large copy buffer to reduce the number of system calls + stdout = &largeBufferReaderFrom{Writer: stdoutCounter} + + repo := req.GetRepository() + repoPath, err := s.locator.GetRepoPath(repo) if err != nil { - return err + return 0, err } - git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().StorageName, repoPath) + git.WarnIfTooManyBitmaps(ctx, s.locator, repo.StorageName, repoPath) - config, err := git.ConvertConfigOptions(req.GitConfigOptions) + config, err := git.ConvertConfigOptions(req.GetGitConfigOptions()) if err != nil { - return err + return 0, err } pr, pw := io.Pipe() @@ -96,7 +109,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r stats, err := stats.ParsePackfileNegotiation(pr) if err != nil { - ctxlogrus.Extract(stream.Context()).WithError(err).Debug("failed parsing packfile negotiation") + ctxlogrus.Extract(ctx).WithError(err).Debug("failed parsing packfile negotiation") return } stats.UpdateMetrics(s.packfileNegotiationMetrics) @@ -105,7 +118,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r commandOpts := []git.CmdOpt{ git.WithGitProtocol(req), git.WithConfig(config...), - git.WithPackObjectsHookEnv(req.Repository), + git.WithPackObjectsHookEnv(repo), } cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, stdin, stdout, stderr, git.SubCmd{ @@ -113,7 +126,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r Args: []string{repoPath}, }, commandOpts...) if err != nil { - return err + return 0, err } timeoutTicker := helper.NewTimerTicker(s.uploadPackRequestTimeout) @@ -130,15 +143,8 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r pw.Close() wg.Wait() - if status, ok := command.ExitStatus(err); ok { - if sendErr := stream.Send(&gitalypb.SSHUploadPackResponse{ - ExitStatus: &gitalypb.ExitStatus{Value: int32(status)}, - }); sendErr != nil { - return sendErr - } - return fmt.Errorf("SSHUploadPack: %v", err) - } - return fmt.Errorf("cmd wait: %v", err) + status, _ := command.ExitStatus(err) + return status, fmt.Errorf("cmd wait: %w", err) } pw.Close() @@ -146,7 +152,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r ctxlogrus.Extract(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details") - return nil + return 0, nil } func validateFirstUploadPackRequest(req *gitalypb.SSHUploadPackRequest) error { |