diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-06-01 16:19:39 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-06-10 09:34:46 +0300 |
commit | 139821f05346b5165a8e86bbc83cbff02fded16d (patch) | |
tree | bfc20e5852604cc7470b84a3f672171cabc3932e | |
parent | aa0f408dcd190dbd958e26fd23869cb0fc37efe3 (diff) |
cmd/gitaly-lfs-smudge: Implement support for long-lived filter protocol
Originally, Git only ever supported smudging the contents of a single
blob by piping that blob to a filter process directly. This meant that
for every blob that needs converting, we have to spawn a full process.
Naturally, this doesn't scale well when repositories grow and when they
have many blobs that do need converting. Git has thus introduced a
long-running filter protocol: instead of spawning one process per blob
that needs converting, Git will only spawn a single server and then
communicate with that server via pktlines.
Implement support for this protocol in gitaly-lfs-smudge so that we can
start using it in our `GetArchive()` RPC.
-rw-r--r-- | cmd/gitaly-lfs-smudge/main.go | 6 | ||||
-rw-r--r-- | cmd/gitaly-lfs-smudge/smudge.go | 251 | ||||
-rw-r--r-- | cmd/gitaly-lfs-smudge/smudge_test.go | 367 | ||||
-rw-r--r-- | internal/git/smudge/config.go | 5 |
4 files changed, 627 insertions, 2 deletions
diff --git a/cmd/gitaly-lfs-smudge/main.go b/cmd/gitaly-lfs-smudge/main.go index c5ce5e4cb..51a63a6b4 100644 --- a/cmd/gitaly-lfs-smudge/main.go +++ b/cmd/gitaly-lfs-smudge/main.go @@ -80,7 +80,11 @@ func run(environment []string, out io.Writer, in io.Reader) error { return nil case smudge.DriverTypeProcess: - return fmt.Errorf("process driver type not yet supported") + if err := process(ctx, cfg, out, in); err != nil { + return fmt.Errorf("running smudge process: %w", err) + } + + return nil default: return fmt.Errorf("unknown driver type: %v", cfg.DriverType) } diff --git a/cmd/gitaly-lfs-smudge/smudge.go b/cmd/gitaly-lfs-smudge/smudge.go index b5b720a87..875a084c2 100644 --- a/cmd/gitaly-lfs-smudge/smudge.go +++ b/cmd/gitaly-lfs-smudge/smudge.go @@ -1,12 +1,16 @@ package main import ( + "bufio" + "bytes" "context" + "errors" "fmt" "io" "net/url" "github.com/git-lfs/git-lfs/v3/lfs" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v15/internal/git/smudge" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/gitlab" @@ -36,6 +40,253 @@ func filter(ctx context.Context, cfg smudge.Config, to io.Writer, from io.Reader return nil } +// processState encodes a state machine for handling long-running filter processes. +type processState int + +const ( + // processStateAnnounce is the initial state where we expect the client to announce its + // presence. + processStateAnnounce = processState(iota) + // processStateVersions is the state where the client announces all its known versions. + processStateVersions + // processStateCapabilities is the state where we have announced our own supported version + // to the client. The client now starts to send its supported capabilities. + processStateCapabilities + // processStateCommand is the state where the client sends the command it wants us to + // perform. + processStateCommand + // processStateSmudgeMetadata is the state where the client sends metadata of the file + // that should be smudged. + processStateSmudgeMetadata + // processStateSmudgeContent is the state where the client sends the contents of the file + // that should be smudged. + processStateSmudgeContent +) + +func process(ctx context.Context, cfg smudge.Config, to io.Writer, from io.Reader) error { + client, err := gitlab.NewHTTPClient(log.ContextLogger(ctx), cfg.Gitlab, cfg.TLS, prometheus.Config{}) + if err != nil { + return fmt.Errorf("creating HTTP client: %w", err) + } + + scanner := pktline.NewScanner(from) + + writer := bufio.NewWriter(to) + + buf := make([]byte, pktline.MaxPktSize-4) + var content bytes.Buffer + + clientSupportsVersion2 := false + clientSupportsSmudgeCapability := false + + state := processStateAnnounce + for scanner.Scan() { + line := scanner.Bytes() + + var data []byte + if !pktline.IsFlush(line) { + payload, err := pktline.Payload(line) + if err != nil { + return fmt.Errorf("getting payload: %w", err) + } + + data = payload + } + + switch state { + case processStateAnnounce: + if !bytes.Equal(data, []byte("git-filter-client\n")) { + return fmt.Errorf("invalid client %q", string(data)) + } + + state = processStateVersions + case processStateVersions: + // The client will announce one or more supported versions to us. We need to + // collect them all in order to determine whether we do in fact support one + // of the announced versions. + if !pktline.IsFlush(line) { + if !bytes.HasPrefix(data, []byte("version=")) { + return fmt.Errorf("expected version, got %q", string(data)) + } + + // We only support version two of this protocol, so we have to check + // whether that version is announced by the client. + if bytes.Equal(data, []byte("version=2\n")) { + clientSupportsVersion2 = true + } + + break + } + + // We have gotten a flush packet, so the client is done announcing its + // versions. If we haven't seen our version announced then it's time to + // quit. + if !clientSupportsVersion2 { + return fmt.Errorf("client does not support version 2") + } + + // Announce that we're a server and that we're talking version 2 of this + // protocol. + if _, err := pktline.WriteString(writer, "git-filter-server\n"); err != nil { + return fmt.Errorf("announcing server presence: %w", err) + } + + if _, err := pktline.WriteString(writer, "version=2\n"); err != nil { + return fmt.Errorf("announcing server version: %w", err) + } + + if err := pktline.WriteFlush(writer); err != nil { + return fmt.Errorf("flushing announcement: %w", err) + } + + state = processStateCapabilities + case processStateCapabilities: + // Similar as above, the client will now announce all the capabilities it + // supports. We only support the "smudging" capability. + if !pktline.IsFlush(line) { + if !bytes.HasPrefix(data, []byte("capability=")) { + return fmt.Errorf("expected capability, got: %q", string(data)) + } + + // We only support smudging contents. + if bytes.Equal(data, []byte("capability=smudge\n")) { + clientSupportsSmudgeCapability = true + } + + break + } + + // If the client doesn't support smudging then we're done. + if !clientSupportsSmudgeCapability { + return fmt.Errorf("client does not support smudge capability") + } + + // Announce that the only capability we support ourselves is smudging. + if _, err := pktline.WriteString(writer, "capability=smudge\n"); err != nil { + return fmt.Errorf("announcing smudge capability: %w", err) + } + + if err := pktline.WriteFlush(writer); err != nil { + return fmt.Errorf("flushing capability announcement: %w", err) + } + + state = processStateCommand + case processStateCommand: + // We're now in the processing loop where the client may announce one or + // more smudge commands. + if !bytes.Equal(data, []byte("command=smudge\n")) { + return fmt.Errorf("expected smudge command, got %q", string(data)) + } + + state = processStateSmudgeMetadata + case processStateSmudgeMetadata: + // The client sends us various information about the blob like the path + // name or treeish. We don't care about that information, so we just wait + // until we get the flush packet. + if !pktline.IsFlush(line) { + break + } + + content.Reset() + + state = processStateSmudgeContent + case processStateSmudgeContent: + // When we receive a flush packet we know that the client is done sending us + // the clean data. + if pktline.IsFlush(line) { + smudgedReader, err := smudgeOneObject(ctx, cfg, client, &content) + if err != nil { + log.ContextLogger(ctx).WithError(err).Error("failed smudging LFS pointer") + + if _, err := pktline.WriteString(writer, "status=error\n"); err != nil { + return fmt.Errorf("reporting failure: %w", err) + } + + if err := pktline.WriteFlush(writer); err != nil { + return fmt.Errorf("flushing error: %w", err) + } + + state = processStateCommand + break + } + defer smudgedReader.Close() + + if _, err := pktline.WriteString(writer, "status=success\n"); err != nil { + return fmt.Errorf("sending status: %w", err) + } + + if err := pktline.WriteFlush(writer); err != nil { + return fmt.Errorf("flushing status: %w", err) + } + + // Read the smudged results in batches and relay it to the client. + // Because pktlines are limited in size we only ever read at most + // that many bytes. + var isEOF bool + for !isEOF { + n, err := smudgedReader.Read(buf) + if err != nil { + if !errors.Is(err, io.EOF) { + return fmt.Errorf("reading smudged contents: %w", err) + } + + isEOF = true + } + + if n > 0 { + if _, err := pktline.WriteString(writer, string(buf[:n])); err != nil { + return fmt.Errorf("writing smudged contents: %w", err) + } + } + } + smudgedReader.Close() + + // We're done writing the smudged contents to the client, so we need + // to tell the client. + if err := pktline.WriteFlush(writer); err != nil { + return fmt.Errorf("flushing smudged contents: %w", err) + } + + // We now have the opportunity to correct the status in case an + // error happened. For now we don't bother though and just abort the + // whole process in case we failed to read an LFS object, and that's + // why we just flush a second time. + if err := pktline.WriteFlush(writer); err != nil { + return fmt.Errorf("flushing final status: %w", err) + } + + // We are now ready to accept another command. + state = processStateCommand + break + } + + // Write the pktline into our buffer. Ideally, we could avoid slurping the + // whole content into memory first. But unfortunately, this is impossible in + // the context of long-running processes: the server-side _must not_ answer + // to the client before it has received all contents. And in the case we got + // a non-LFS-pointer as input, this means we have to slurp in all of its + // contents so that we can echo it back to the caller. + if _, err := content.Write(data); err != nil { + return fmt.Errorf("could not write clean data: %w", err) + } + } + + if err := writer.Flush(); err != nil { + return fmt.Errorf("could not flush: %w", err) + } + } + + if err := scanner.Err(); err != nil && !errors.Is(err, io.EOF) { + return fmt.Errorf("error scanning input: %w", err) + } + + if state != processStateCommand { + return fmt.Errorf("unexpected termination in state %v", state) + } + + return nil +} + func smudgeOneObject(ctx context.Context, cfg smudge.Config, gitlabClient *gitlab.HTTPClient, from io.Reader) (io.ReadCloser, error) { logger := log.ContextLogger(ctx) diff --git a/cmd/gitaly-lfs-smudge/smudge_test.go b/cmd/gitaly-lfs-smudge/smudge_test.go index b4f8c751a..f3d77ba0c 100644 --- a/cmd/gitaly-lfs-smudge/smudge_test.go +++ b/cmd/gitaly-lfs-smudge/smudge_test.go @@ -2,11 +2,13 @@ package main import ( "bytes" + "fmt" "net/http" "strings" "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v15/internal/git/smudge" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/gitlab" @@ -191,3 +193,368 @@ func TestFilter_unsuccessful(t *testing.T) { }) } } + +func TestProcess(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + gitlabCfg, cleanup := runTestServer(t, defaultOptions) + defer cleanup() + + defaultSmudgeCfg := smudge.Config{ + GlRepository: "project-1", + Gitlab: gitlabCfg, + TLS: config.TLS{ + CertPath: certPath, + KeyPath: keyPath, + }, + DriverType: smudge.DriverTypeProcess, + } + + pkt := func(data string) string { + return fmt.Sprintf("%04x%s", len(data)+4, data) + } + flush := "0000" + + for _, tc := range []struct { + desc string + cfg smudge.Config + input []string + expectedErr error + expectedOutput string + }{ + { + desc: "unsupported client", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-foobar-client\n"), + pkt("version=2\n"), + flush, + }, + expectedErr: fmt.Errorf("invalid client %q", "git-foobar-client\n"), + }, + { + desc: "unsupported version", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=3\n"), + flush, + }, + expectedErr: fmt.Errorf("client does not support version 2"), + }, + { + desc: "unsupported capability", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=foobar\n"), + flush, + }, + expectedErr: fmt.Errorf("client does not support smudge capability"), + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + }, ""), + }, + { + desc: "unsupported command", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("command=clean\n"), + flush, + }, + expectedErr: fmt.Errorf("expected smudge command, got %q", "command=clean\n"), + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + }, ""), + }, + { + desc: "single non-LFS blob", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("command=smudge\n"), + pkt("some=metadata\n"), + pkt("more=metadata\n"), + flush, + pkt("something"), + flush, + }, + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("status=success\n"), + flush, + pkt("something"), + flush, + flush, + }, ""), + }, + { + desc: "single non-LFS blob with multiline contents", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("command=smudge\n"), + pkt("some=metadata\n"), + pkt("more=metadata\n"), + flush, + pkt("some"), + pkt("thing"), + flush, + }, + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("status=success\n"), + flush, + pkt("something"), + flush, + flush, + }, ""), + }, + { + desc: "multiple non-LFS blobs", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("command=smudge\n"), + flush, + pkt("first"), + flush, + pkt("command=smudge\n"), + flush, + pkt("second"), + flush, + }, + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("status=success\n"), + flush, + pkt("first"), + flush, + flush, + pkt("status=success\n"), + flush, + pkt("second"), + flush, + flush, + }, ""), + }, + { + desc: "single LFS blob", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("command=smudge\n"), + flush, + pkt(lfsPointer), + flush, + }, + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("status=success\n"), + flush, + pkt("hello world"), + flush, + flush, + }, ""), + }, + { + desc: "multiple LFS blobs", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("command=smudge\n"), + flush, + pkt(lfsPointer), + flush, + pkt("command=smudge\n"), + flush, + pkt(lfsPointer), + flush, + }, + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("status=success\n"), + flush, + pkt("hello world"), + flush, + flush, + pkt("status=success\n"), + flush, + pkt("hello world"), + flush, + flush, + }, ""), + }, + { + desc: "full-blown session", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=1\n"), + pkt("version=2\n"), + pkt("version=99\n"), + flush, + pkt("capability=frobnicate\n"), + pkt("capability=smudge\n"), + pkt("capability=clean\n"), + flush, + // First blob. + pkt("command=smudge\n"), + pkt("unused=metadata\n"), + flush, + pkt("something something"), + flush, + // Second blob. + pkt("command=smudge\n"), + pkt("more=unused=metadata\n"), + flush, + pkt(lfsPointer), + flush, + // Third blob. + pkt("command=smudge\n"), + pkt("this=is a huge binary blob\n"), + flush, + pkt(strings.Repeat("1", pktline.MaxPktSize-4)), + pkt(strings.Repeat("2", pktline.MaxPktSize-4)), + pkt(strings.Repeat("3", pktline.MaxPktSize-4)), + flush, + }, + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("status=success\n"), + flush, + pkt("something something"), + flush, + flush, + pkt("status=success\n"), + flush, + pkt("hello world"), + flush, + flush, + pkt("status=success\n"), + flush, + // This looks a bit funny, but can be attributed to the fact that we + // use an io.Multireader to read the first 1024 bytes when parsing + // the LFS pointer. + pkt(strings.Repeat("1", 1024)), + pkt(strings.Repeat("1", pktline.MaxPktSize-4-1024) + strings.Repeat("2", 1024)), + pkt(strings.Repeat("2", pktline.MaxPktSize-4-1024) + strings.Repeat("3", 1024)), + pkt(strings.Repeat("3", pktline.MaxPktSize-4-1024)), + flush, + flush, + }, ""), + }, + { + desc: "partial failure", + cfg: defaultSmudgeCfg, + input: []string{ + pkt("git-filter-client\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + // The first command sends an unknown LFS pointer that should cause + // an error. + pkt("command=smudge\n"), + flush, + pkt(strings.Join([]string{ + "version https://git-lfs.github.com/spec/v1", + "oid sha256:1111111111111111111111111111111111111111111111111111111111111111", + "size 177735", + }, "\n") + "\n"), + flush, + // And the second command sends a known LFS pointer that should + // still be processed as expected, regardless of the initial error. + pkt("command=smudge\n"), + flush, + pkt(lfsPointer), + flush, + }, + expectedOutput: strings.Join([]string{ + pkt("git-filter-server\n"), + pkt("version=2\n"), + flush, + pkt("capability=smudge\n"), + flush, + pkt("status=error\n"), + flush, + pkt("status=success\n"), + flush, + pkt("hello world"), + flush, + flush, + }, ""), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var inputBuffer bytes.Buffer + for _, input := range tc.input { + _, err := inputBuffer.WriteString(input) + require.NoError(t, err) + } + + var outputBuffer bytes.Buffer + require.Equal(t, tc.expectedErr, process(ctx, tc.cfg, &outputBuffer, &inputBuffer)) + require.Equal(t, tc.expectedOutput, outputBuffer.String()) + }) + } +} diff --git a/internal/git/smudge/config.go b/internal/git/smudge/config.go index d7f5a48d0..c64c4f643 100644 --- a/internal/git/smudge/config.go +++ b/internal/git/smudge/config.go @@ -103,7 +103,10 @@ func (c Config) GitConfiguration(cfg config.Cfg) (git.ConfigPair, error) { Value: filepath.Join(cfg.BinDir, "gitaly-lfs-smudge"), }, nil case DriverTypeProcess: - return git.ConfigPair{}, fmt.Errorf("processor driver type not yet supported") + return git.ConfigPair{ + Key: "filter.lfs.process", + Value: filepath.Join(cfg.BinDir, "gitaly-lfs-smudge"), + }, nil default: return git.ConfigPair{}, fmt.Errorf("unknown driver type: %v", c.DriverType) } |