Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2022-01-11 20:11:05 +0300
committerJacob Vosmaer <jacob@gitlab.com>2022-01-17 16:12:05 +0300
commitabdd6cffc874e6578072b20fb0c7991b0b725c56 (patch)
treea0526cb22c110a864bc4197c4125ab8644dbd8a7
parent7ce0d18ad44686865aa0dbf5f1b47d9cc05988be (diff)
SSHUploadPack: decouple implementation from gRPC
This separates SSHUploadPack into an outer layer that works with the gRPC stream, and a core function that works with io.Reader, io.Writer etc. This is preparation for SSHUploadPackWithSidechannel.
-rw-r--r--internal/gitaly/service/ssh/upload_pack.go84
1 files changed, 45 insertions, 39 deletions
diff --git a/internal/gitaly/service/ssh/upload_pack.go b/internal/gitaly/service/ssh/upload_pack.go
index 21206a233..a3bed9e9b 100644
--- a/internal/gitaly/service/ssh/upload_pack.go
+++ b/internal/gitaly/service/ssh/upload_pack.go
@@ -18,6 +18,8 @@ import (
)
func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) error {
+ ctx := stream.Context()
+
req, err := stream.Recv() // First request contains Repository only
if err != nil {
return helper.ErrInternal(err)
@@ -28,7 +30,7 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e
repository = req.Repository.GlRepository
}
- ctxlogrus.Extract(stream.Context()).WithFields(log.Fields{
+ ctxlogrus.Extract(ctx).WithFields(log.Fields{
"GlRepository": repository,
"GitConfigOptions": req.GitConfigOptions,
"GitProtocol": req.GitProtocol,
@@ -38,17 +40,6 @@ func (s *server) SSHUploadPack(stream gitalypb.SSHService_SSHUploadPackServer) e
return helper.ErrInvalidArgument(err)
}
- if err = s.sshUploadPack(stream, req); err != nil {
- return helper.ErrInternal(err)
- }
-
- return nil
-}
-
-func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, req *gitalypb.SSHUploadPackRequest) error {
- ctx, cancelCtx := context.WithCancel(stream.Context())
- defer cancelCtx()
-
stdin := streamio.NewReader(func() ([]byte, error) {
request, err := stream.Recv()
return request.GetStdin(), err
@@ -57,29 +48,51 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
// gRPC doesn't allow concurrent writes to a stream, so we need to
// synchronize writing stdout and stderrr.
var m sync.Mutex
-
- stdoutCounter := &helper.CountingWriter{
- W: streamio.NewSyncWriter(&m, func(p []byte) error {
- return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p})
- }),
- }
- // Use large copy buffer to reduce the number of system calls
- stdout := &largeBufferReaderFrom{Writer: stdoutCounter}
-
+ stdout := streamio.NewSyncWriter(&m, func(p []byte) error {
+ return stream.Send(&gitalypb.SSHUploadPackResponse{Stdout: p})
+ })
stderr := streamio.NewSyncWriter(&m, func(p []byte) error {
return stream.Send(&gitalypb.SSHUploadPackResponse{Stderr: p})
})
- repoPath, err := s.locator.GetRepoPath(req.Repository)
+ if status, err := s.sshUploadPack(ctx, req, stdin, stdout, stderr); err != nil {
+ if errSend := stream.Send(&gitalypb.SSHUploadPackResponse{
+ ExitStatus: &gitalypb.ExitStatus{Value: int32(status)},
+ }); errSend != nil {
+ ctxlogrus.Extract(ctx).WithError(errSend).Error("send final status code")
+ }
+
+ return helper.ErrInternal(err)
+ }
+
+ return nil
+}
+
+type sshUploadPackRequest interface {
+ GetRepository() *gitalypb.Repository
+ GetGitConfigOptions() []string
+ GetGitProtocol() string
+}
+
+func (s *server) sshUploadPack(ctx context.Context, req sshUploadPackRequest, stdin io.Reader, stdout, stderr io.Writer) (int, error) {
+ ctx, cancelCtx := context.WithCancel(ctx)
+ defer cancelCtx()
+
+ stdoutCounter := &helper.CountingWriter{W: stdout}
+ // Use large copy buffer to reduce the number of system calls
+ stdout = &largeBufferReaderFrom{Writer: stdoutCounter}
+
+ repo := req.GetRepository()
+ repoPath, err := s.locator.GetRepoPath(repo)
if err != nil {
- return err
+ return 0, err
}
- git.WarnIfTooManyBitmaps(ctx, s.locator, req.GetRepository().StorageName, repoPath)
+ git.WarnIfTooManyBitmaps(ctx, s.locator, repo.StorageName, repoPath)
- config, err := git.ConvertConfigOptions(req.GitConfigOptions)
+ config, err := git.ConvertConfigOptions(req.GetGitConfigOptions())
if err != nil {
- return err
+ return 0, err
}
pr, pw := io.Pipe()
@@ -96,7 +109,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
stats, err := stats.ParsePackfileNegotiation(pr)
if err != nil {
- ctxlogrus.Extract(stream.Context()).WithError(err).Debug("failed parsing packfile negotiation")
+ ctxlogrus.Extract(ctx).WithError(err).Debug("failed parsing packfile negotiation")
return
}
stats.UpdateMetrics(s.packfileNegotiationMetrics)
@@ -105,7 +118,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
commandOpts := []git.CmdOpt{
git.WithGitProtocol(req),
git.WithConfig(config...),
- git.WithPackObjectsHookEnv(req.Repository),
+ git.WithPackObjectsHookEnv(repo),
}
cmd, monitor, err := monitorStdinCommand(ctx, s.gitCmdFactory, stdin, stdout, stderr, git.SubCmd{
@@ -113,7 +126,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
Args: []string{repoPath},
}, commandOpts...)
if err != nil {
- return err
+ return 0, err
}
timeoutTicker := helper.NewTimerTicker(s.uploadPackRequestTimeout)
@@ -130,15 +143,8 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
pw.Close()
wg.Wait()
- if status, ok := command.ExitStatus(err); ok {
- if sendErr := stream.Send(&gitalypb.SSHUploadPackResponse{
- ExitStatus: &gitalypb.ExitStatus{Value: int32(status)},
- }); sendErr != nil {
- return sendErr
- }
- return fmt.Errorf("SSHUploadPack: %v", err)
- }
- return fmt.Errorf("cmd wait: %v", err)
+ status, _ := command.ExitStatus(err)
+ return status, fmt.Errorf("cmd wait: %w", err)
}
pw.Close()
@@ -146,7 +152,7 @@ func (s *server) sshUploadPack(stream gitalypb.SSHService_SSHUploadPackServer, r
ctxlogrus.Extract(ctx).WithField("response_bytes", stdoutCounter.N).Info("request details")
- return nil
+ return 0, nil
}
func validateFirstUploadPackRequest(req *gitalypb.SSHUploadPackRequest) error {