Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-06-01 16:19:39 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-06-10 09:34:46 +0300
commit139821f05346b5165a8e86bbc83cbff02fded16d (patch)
treebfc20e5852604cc7470b84a3f672171cabc3932e
parentaa0f408dcd190dbd958e26fd23869cb0fc37efe3 (diff)
cmd/gitaly-lfs-smudge: Implement support for long-lived filter protocol
Originally, Git only ever supported smudging the contents of a single blob by piping that blob to a filter process directly. This meant that for every blob that needs converting, we have to spawn a full process. Naturally, this doesn't scale well when repositories grow and when they have many blobs that do need converting. Git has thus introduced a long-running filter protocol: instead of spawning one process per blob that needs converting, Git will only spawn a single server and then communicate with that server via pktlines. Implement support for this protocol in gitaly-lfs-smudge so that we can start using it in our `GetArchive()` RPC.
-rw-r--r--cmd/gitaly-lfs-smudge/main.go6
-rw-r--r--cmd/gitaly-lfs-smudge/smudge.go251
-rw-r--r--cmd/gitaly-lfs-smudge/smudge_test.go367
-rw-r--r--internal/git/smudge/config.go5
4 files changed, 627 insertions, 2 deletions
diff --git a/cmd/gitaly-lfs-smudge/main.go b/cmd/gitaly-lfs-smudge/main.go
index c5ce5e4cb..51a63a6b4 100644
--- a/cmd/gitaly-lfs-smudge/main.go
+++ b/cmd/gitaly-lfs-smudge/main.go
@@ -80,7 +80,11 @@ func run(environment []string, out io.Writer, in io.Reader) error {
return nil
case smudge.DriverTypeProcess:
- return fmt.Errorf("process driver type not yet supported")
+ if err := process(ctx, cfg, out, in); err != nil {
+ return fmt.Errorf("running smudge process: %w", err)
+ }
+
+ return nil
default:
return fmt.Errorf("unknown driver type: %v", cfg.DriverType)
}
diff --git a/cmd/gitaly-lfs-smudge/smudge.go b/cmd/gitaly-lfs-smudge/smudge.go
index b5b720a87..875a084c2 100644
--- a/cmd/gitaly-lfs-smudge/smudge.go
+++ b/cmd/gitaly-lfs-smudge/smudge.go
@@ -1,12 +1,16 @@
package main
import (
+ "bufio"
+ "bytes"
"context"
+ "errors"
"fmt"
"io"
"net/url"
"github.com/git-lfs/git-lfs/v3/lfs"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/smudge"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitlab"
@@ -36,6 +40,253 @@ func filter(ctx context.Context, cfg smudge.Config, to io.Writer, from io.Reader
return nil
}
+// processState encodes a state machine for handling long-running filter processes.
+type processState int
+
+const (
+ // processStateAnnounce is the initial state where we expect the client to announce its
+ // presence.
+ processStateAnnounce = processState(iota)
+ // processStateVersions is the state where the client announces all its known versions.
+ processStateVersions
+ // processStateCapabilities is the state where we have announced our own supported version
+ // to the client. The client now starts to send its supported capabilities.
+ processStateCapabilities
+ // processStateCommand is the state where the client sends the command it wants us to
+ // perform.
+ processStateCommand
+ // processStateSmudgeMetadata is the state where the client sends metadata of the file
+ // that should be smudged.
+ processStateSmudgeMetadata
+ // processStateSmudgeContent is the state where the client sends the contents of the file
+ // that should be smudged.
+ processStateSmudgeContent
+)
+
+func process(ctx context.Context, cfg smudge.Config, to io.Writer, from io.Reader) error {
+ client, err := gitlab.NewHTTPClient(log.ContextLogger(ctx), cfg.Gitlab, cfg.TLS, prometheus.Config{})
+ if err != nil {
+ return fmt.Errorf("creating HTTP client: %w", err)
+ }
+
+ scanner := pktline.NewScanner(from)
+
+ writer := bufio.NewWriter(to)
+
+ buf := make([]byte, pktline.MaxPktSize-4)
+ var content bytes.Buffer
+
+ clientSupportsVersion2 := false
+ clientSupportsSmudgeCapability := false
+
+ state := processStateAnnounce
+ for scanner.Scan() {
+ line := scanner.Bytes()
+
+ var data []byte
+ if !pktline.IsFlush(line) {
+ payload, err := pktline.Payload(line)
+ if err != nil {
+ return fmt.Errorf("getting payload: %w", err)
+ }
+
+ data = payload
+ }
+
+ switch state {
+ case processStateAnnounce:
+ if !bytes.Equal(data, []byte("git-filter-client\n")) {
+ return fmt.Errorf("invalid client %q", string(data))
+ }
+
+ state = processStateVersions
+ case processStateVersions:
+ // The client will announce one or more supported versions to us. We need to
+ // collect them all in order to determine whether we do in fact support one
+ // of the announced versions.
+ if !pktline.IsFlush(line) {
+ if !bytes.HasPrefix(data, []byte("version=")) {
+ return fmt.Errorf("expected version, got %q", string(data))
+ }
+
+ // We only support version two of this protocol, so we have to check
+ // whether that version is announced by the client.
+ if bytes.Equal(data, []byte("version=2\n")) {
+ clientSupportsVersion2 = true
+ }
+
+ break
+ }
+
+ // We have gotten a flush packet, so the client is done announcing its
+ // versions. If we haven't seen our version announced then it's time to
+ // quit.
+ if !clientSupportsVersion2 {
+ return fmt.Errorf("client does not support version 2")
+ }
+
+ // Announce that we're a server and that we're talking version 2 of this
+ // protocol.
+ if _, err := pktline.WriteString(writer, "git-filter-server\n"); err != nil {
+ return fmt.Errorf("announcing server presence: %w", err)
+ }
+
+ if _, err := pktline.WriteString(writer, "version=2\n"); err != nil {
+ return fmt.Errorf("announcing server version: %w", err)
+ }
+
+ if err := pktline.WriteFlush(writer); err != nil {
+ return fmt.Errorf("flushing announcement: %w", err)
+ }
+
+ state = processStateCapabilities
+ case processStateCapabilities:
+ // Similar as above, the client will now announce all the capabilities it
+ // supports. We only support the "smudging" capability.
+ if !pktline.IsFlush(line) {
+ if !bytes.HasPrefix(data, []byte("capability=")) {
+ return fmt.Errorf("expected capability, got: %q", string(data))
+ }
+
+ // We only support smudging contents.
+ if bytes.Equal(data, []byte("capability=smudge\n")) {
+ clientSupportsSmudgeCapability = true
+ }
+
+ break
+ }
+
+ // If the client doesn't support smudging then we're done.
+ if !clientSupportsSmudgeCapability {
+ return fmt.Errorf("client does not support smudge capability")
+ }
+
+ // Announce that the only capability we support ourselves is smudging.
+ if _, err := pktline.WriteString(writer, "capability=smudge\n"); err != nil {
+ return fmt.Errorf("announcing smudge capability: %w", err)
+ }
+
+ if err := pktline.WriteFlush(writer); err != nil {
+ return fmt.Errorf("flushing capability announcement: %w", err)
+ }
+
+ state = processStateCommand
+ case processStateCommand:
+ // We're now in the processing loop where the client may announce one or
+ // more smudge commands.
+ if !bytes.Equal(data, []byte("command=smudge\n")) {
+ return fmt.Errorf("expected smudge command, got %q", string(data))
+ }
+
+ state = processStateSmudgeMetadata
+ case processStateSmudgeMetadata:
+ // The client sends us various information about the blob like the path
+ // name or treeish. We don't care about that information, so we just wait
+ // until we get the flush packet.
+ if !pktline.IsFlush(line) {
+ break
+ }
+
+ content.Reset()
+
+ state = processStateSmudgeContent
+ case processStateSmudgeContent:
+ // When we receive a flush packet we know that the client is done sending us
+ // the clean data.
+ if pktline.IsFlush(line) {
+ smudgedReader, err := smudgeOneObject(ctx, cfg, client, &content)
+ if err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("failed smudging LFS pointer")
+
+ if _, err := pktline.WriteString(writer, "status=error\n"); err != nil {
+ return fmt.Errorf("reporting failure: %w", err)
+ }
+
+ if err := pktline.WriteFlush(writer); err != nil {
+ return fmt.Errorf("flushing error: %w", err)
+ }
+
+ state = processStateCommand
+ break
+ }
+ defer smudgedReader.Close()
+
+ if _, err := pktline.WriteString(writer, "status=success\n"); err != nil {
+ return fmt.Errorf("sending status: %w", err)
+ }
+
+ if err := pktline.WriteFlush(writer); err != nil {
+ return fmt.Errorf("flushing status: %w", err)
+ }
+
+ // Read the smudged results in batches and relay it to the client.
+ // Because pktlines are limited in size we only ever read at most
+ // that many bytes.
+ var isEOF bool
+ for !isEOF {
+ n, err := smudgedReader.Read(buf)
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ return fmt.Errorf("reading smudged contents: %w", err)
+ }
+
+ isEOF = true
+ }
+
+ if n > 0 {
+ if _, err := pktline.WriteString(writer, string(buf[:n])); err != nil {
+ return fmt.Errorf("writing smudged contents: %w", err)
+ }
+ }
+ }
+ smudgedReader.Close()
+
+ // We're done writing the smudged contents to the client, so we need
+ // to tell the client.
+ if err := pktline.WriteFlush(writer); err != nil {
+ return fmt.Errorf("flushing smudged contents: %w", err)
+ }
+
+ // We now have the opportunity to correct the status in case an
+ // error happened. For now we don't bother though and just abort the
+ // whole process in case we failed to read an LFS object, and that's
+ // why we just flush a second time.
+ if err := pktline.WriteFlush(writer); err != nil {
+ return fmt.Errorf("flushing final status: %w", err)
+ }
+
+ // We are now ready to accept another command.
+ state = processStateCommand
+ break
+ }
+
+ // Write the pktline into our buffer. Ideally, we could avoid slurping the
+ // whole content into memory first. But unfortunately, this is impossible in
+ // the context of long-running processes: the server-side _must not_ answer
+ // to the client before it has received all contents. And in the case we got
+ // a non-LFS-pointer as input, this means we have to slurp in all of its
+ // contents so that we can echo it back to the caller.
+ if _, err := content.Write(data); err != nil {
+ return fmt.Errorf("could not write clean data: %w", err)
+ }
+ }
+
+ if err := writer.Flush(); err != nil {
+ return fmt.Errorf("could not flush: %w", err)
+ }
+ }
+
+ if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) {
+ return fmt.Errorf("error scanning input: %w", err)
+ }
+
+ if state != processStateCommand {
+ return fmt.Errorf("unexpected termination in state %v", state)
+ }
+
+ return nil
+}
+
func smudgeOneObject(ctx context.Context, cfg smudge.Config, gitlabClient *gitlab.HTTPClient, from io.Reader) (io.ReadCloser, error) {
logger := log.ContextLogger(ctx)
diff --git a/cmd/gitaly-lfs-smudge/smudge_test.go b/cmd/gitaly-lfs-smudge/smudge_test.go
index b4f8c751a..f3d77ba0c 100644
--- a/cmd/gitaly-lfs-smudge/smudge_test.go
+++ b/cmd/gitaly-lfs-smudge/smudge_test.go
@@ -2,11 +2,13 @@ package main
import (
"bytes"
+ "fmt"
"net/http"
"strings"
"testing"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/smudge"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitlab"
@@ -191,3 +193,368 @@ func TestFilter_unsuccessful(t *testing.T) {
})
}
}
+
+func TestProcess(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+
+ gitlabCfg, cleanup := runTestServer(t, defaultOptions)
+ defer cleanup()
+
+ defaultSmudgeCfg := smudge.Config{
+ GlRepository: "project-1",
+ Gitlab: gitlabCfg,
+ TLS: config.TLS{
+ CertPath: certPath,
+ KeyPath: keyPath,
+ },
+ DriverType: smudge.DriverTypeProcess,
+ }
+
+ pkt := func(data string) string {
+ return fmt.Sprintf("%04x%s", len(data)+4, data)
+ }
+ flush := "0000"
+
+ for _, tc := range []struct {
+ desc string
+ cfg smudge.Config
+ input []string
+ expectedErr error
+ expectedOutput string
+ }{
+ {
+ desc: "unsupported client",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-foobar-client\n"),
+ pkt("version=2\n"),
+ flush,
+ },
+ expectedErr: fmt.Errorf("invalid client %q", "git-foobar-client\n"),
+ },
+ {
+ desc: "unsupported version",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=3\n"),
+ flush,
+ },
+ expectedErr: fmt.Errorf("client does not support version 2"),
+ },
+ {
+ desc: "unsupported capability",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=foobar\n"),
+ flush,
+ },
+ expectedErr: fmt.Errorf("client does not support smudge capability"),
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ }, ""),
+ },
+ {
+ desc: "unsupported command",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("command=clean\n"),
+ flush,
+ },
+ expectedErr: fmt.Errorf("expected smudge command, got %q", "command=clean\n"),
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ }, ""),
+ },
+ {
+ desc: "single non-LFS blob",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("command=smudge\n"),
+ pkt("some=metadata\n"),
+ pkt("more=metadata\n"),
+ flush,
+ pkt("something"),
+ flush,
+ },
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("something"),
+ flush,
+ flush,
+ }, ""),
+ },
+ {
+ desc: "single non-LFS blob with multiline contents",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("command=smudge\n"),
+ pkt("some=metadata\n"),
+ pkt("more=metadata\n"),
+ flush,
+ pkt("some"),
+ pkt("thing"),
+ flush,
+ },
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("something"),
+ flush,
+ flush,
+ }, ""),
+ },
+ {
+ desc: "multiple non-LFS blobs",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("command=smudge\n"),
+ flush,
+ pkt("first"),
+ flush,
+ pkt("command=smudge\n"),
+ flush,
+ pkt("second"),
+ flush,
+ },
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("first"),
+ flush,
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("second"),
+ flush,
+ flush,
+ }, ""),
+ },
+ {
+ desc: "single LFS blob",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("command=smudge\n"),
+ flush,
+ pkt(lfsPointer),
+ flush,
+ },
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("hello world"),
+ flush,
+ flush,
+ }, ""),
+ },
+ {
+ desc: "multiple LFS blobs",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("command=smudge\n"),
+ flush,
+ pkt(lfsPointer),
+ flush,
+ pkt("command=smudge\n"),
+ flush,
+ pkt(lfsPointer),
+ flush,
+ },
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("hello world"),
+ flush,
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("hello world"),
+ flush,
+ flush,
+ }, ""),
+ },
+ {
+ desc: "full-blown session",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=1\n"),
+ pkt("version=2\n"),
+ pkt("version=99\n"),
+ flush,
+ pkt("capability=frobnicate\n"),
+ pkt("capability=smudge\n"),
+ pkt("capability=clean\n"),
+ flush,
+ // First blob.
+ pkt("command=smudge\n"),
+ pkt("unused=metadata\n"),
+ flush,
+ pkt("something something"),
+ flush,
+ // Second blob.
+ pkt("command=smudge\n"),
+ pkt("more=unused=metadata\n"),
+ flush,
+ pkt(lfsPointer),
+ flush,
+ // Third blob.
+ pkt("command=smudge\n"),
+ pkt("this=is a huge binary blob\n"),
+ flush,
+ pkt(strings.Repeat("1", pktline.MaxPktSize-4)),
+ pkt(strings.Repeat("2", pktline.MaxPktSize-4)),
+ pkt(strings.Repeat("3", pktline.MaxPktSize-4)),
+ flush,
+ },
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("something something"),
+ flush,
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("hello world"),
+ flush,
+ flush,
+ pkt("status=success\n"),
+ flush,
+ // This looks a bit funny, but can be attributed to the fact that we
+ // use an io.Multireader to read the first 1024 bytes when parsing
+ // the LFS pointer.
+ pkt(strings.Repeat("1", 1024)),
+ pkt(strings.Repeat("1", pktline.MaxPktSize-4-1024) + strings.Repeat("2", 1024)),
+ pkt(strings.Repeat("2", pktline.MaxPktSize-4-1024) + strings.Repeat("3", 1024)),
+ pkt(strings.Repeat("3", pktline.MaxPktSize-4-1024)),
+ flush,
+ flush,
+ }, ""),
+ },
+ {
+ desc: "partial failure",
+ cfg: defaultSmudgeCfg,
+ input: []string{
+ pkt("git-filter-client\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ // The first command sends an unknown LFS pointer that should cause
+ // an error.
+ pkt("command=smudge\n"),
+ flush,
+ pkt(strings.Join([]string{
+ "version https://git-lfs.github.com/spec/v1",
+ "oid sha256:1111111111111111111111111111111111111111111111111111111111111111",
+ "size 177735",
+ }, "\n") + "\n"),
+ flush,
+ // And the second command sends a known LFS pointer that should
+ // still be processed as expected, regardless of the initial error.
+ pkt("command=smudge\n"),
+ flush,
+ pkt(lfsPointer),
+ flush,
+ },
+ expectedOutput: strings.Join([]string{
+ pkt("git-filter-server\n"),
+ pkt("version=2\n"),
+ flush,
+ pkt("capability=smudge\n"),
+ flush,
+ pkt("status=error\n"),
+ flush,
+ pkt("status=success\n"),
+ flush,
+ pkt("hello world"),
+ flush,
+ flush,
+ }, ""),
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ var inputBuffer bytes.Buffer
+ for _, input := range tc.input {
+ _, err := inputBuffer.WriteString(input)
+ require.NoError(t, err)
+ }
+
+ var outputBuffer bytes.Buffer
+ require.Equal(t, tc.expectedErr, process(ctx, tc.cfg, &outputBuffer, &inputBuffer))
+ require.Equal(t, tc.expectedOutput, outputBuffer.String())
+ })
+ }
+}
diff --git a/internal/git/smudge/config.go b/internal/git/smudge/config.go
index d7f5a48d0..c64c4f643 100644
--- a/internal/git/smudge/config.go
+++ b/internal/git/smudge/config.go
@@ -103,7 +103,10 @@ func (c Config) GitConfiguration(cfg config.Cfg) (git.ConfigPair, error) {
Value: filepath.Join(cfg.BinDir, "gitaly-lfs-smudge"),
}, nil
case DriverTypeProcess:
- return git.ConfigPair{}, fmt.Errorf("processor driver type not yet supported")
+ return git.ConfigPair{
+ Key: "filter.lfs.process",
+ Value: filepath.Join(cfg.BinDir, "gitaly-lfs-smudge"),
+ }, nil
default:
return git.ConfigPair{}, fmt.Errorf("unknown driver type: %v", c.DriverType)
}