diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2020-12-17 14:59:07 +0300 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2020-12-17 14:59:07 +0300 |
commit | 8b573c94895dc0ac0e1d9d59cf3e8745e8b539ca (patch) | |
tree | 544930fb309b30317ae9797a9683768705d664c4 /workhorse/gitaly_test.go | |
parent | 4b1de649d0168371549608993deac953eb692019 (diff) |
Add latest changes from gitlab-org/gitlab@13-7-stable-eev13.7.0-rc42
Diffstat (limited to 'workhorse/gitaly_test.go')
-rw-r--r-- | workhorse/gitaly_test.go | 696 |
1 files changed, 696 insertions, 0 deletions
diff --git a/workhorse/gitaly_test.go b/workhorse/gitaly_test.go new file mode 100644 index 00000000000..95d6907ac6a --- /dev/null +++ b/workhorse/gitaly_test.go @@ -0,0 +1,696 @@ +package main + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "io/ioutil" + "math/rand" + "net" + "net/http" + "os" + "os/exec" + "path" + "strings" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274 + "github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab-workhorse/-/issues/274 + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + + "gitlab.com/gitlab-org/gitlab-workhorse/internal/api" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/git" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/gitaly" + "gitlab.com/gitlab-org/gitlab-workhorse/internal/testhelper" +) + +func TestFailedCloneNoGitaly(t *testing.T) { + // Prepare clone directory + require.NoError(t, os.RemoveAll(scratchDir)) + + authBody := &api.Response{ + GL_ID: "user-123", + GL_USERNAME: "username", + // This will create a failure to connect to Gitaly + GitalyServer: gitaly.Server{Address: "unix:/nonexistent"}, + } + + // Prepare test server and backend + ts := testAuthServer(t, nil, nil, 200, authBody) + defer ts.Close() + ws := startWorkhorseServer(ts.URL) + defer ws.Close() + + // Do the git clone + cloneCmd := exec.Command("git", "clone", fmt.Sprintf("%s/%s", ws.URL, testRepo), checkoutDir) + out, err := cloneCmd.CombinedOutput() + t.Log(string(out)) + require.Error(t, err, "git clone should have failed") +} + +func TestGetInfoRefsProxiedToGitalySuccessfully(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + + apiResponse := gitOkBody(t) + apiResponse.GitalyServer.Address = gitalyAddress + + goodMetadata := map[string]string{ + "gitaly-feature-foobar": "true", + "gitaly-feature-bazqux": "false", + } + badMetadata := map[string]string{ + "bad-metadata": "is blocked", + } + + features := make(map[string]string) + for k, v := range goodMetadata { + features[k] = v + } + for k, v := range badMetadata { + features[k] = v + } + apiResponse.GitalyServer.Features = features + + testCases := []struct { + showAllRefs bool + gitRpc string + }{ + {showAllRefs: false, gitRpc: "git-upload-pack"}, + {showAllRefs: true, gitRpc: "git-upload-pack"}, + {showAllRefs: false, gitRpc: "git-receive-pack"}, + {showAllRefs: true, gitRpc: "git-receive-pack"}, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("ShowAllRefs=%v,gitRpc=%v", tc.showAllRefs, tc.gitRpc), func(t *testing.T) { + apiResponse.ShowAllRefs = tc.showAllRefs + + ts := testAuthServer(t, nil, nil, 200, apiResponse) + defer ts.Close() + + ws := startWorkhorseServer(ts.URL) + defer ws.Close() + + gitProtocol := "fake git protocol" + resource := "/gitlab-org/gitlab-test.git/info/refs?service=" + tc.gitRpc + resp, body := httpGet(t, ws.URL+resource, map[string]string{"Git-Protocol": gitProtocol}) + + require.Equal(t, 200, resp.StatusCode) + + bodySplit := strings.SplitN(body, "\000", 3) + require.Len(t, bodySplit, 3) + + gitalyRequest := &gitalypb.InfoRefsRequest{} + require.NoError(t, jsonpb.UnmarshalString(bodySplit[0], gitalyRequest)) + + require.Equal(t, gitProtocol, gitalyRequest.GitProtocol) + if tc.showAllRefs { + require.Equal(t, []string{git.GitConfigShowAllRefs}, gitalyRequest.GitConfigOptions) + } else { + require.Empty(t, gitalyRequest.GitConfigOptions) + } + + require.Equal(t, tc.gitRpc, bodySplit[1]) + + require.Equal(t, string(testhelper.GitalyInfoRefsResponseMock), bodySplit[2], "GET %q: response body", resource) + + md := gitalyServer.LastIncomingMetadata + for k, v := range goodMetadata { + actual := md[k] + require.Len(t, actual, 1, "number of metadata values for %v", k) + require.Equal(t, v, actual[0], "value for %v", k) + } + + for k := range badMetadata { + actual := md[k] + require.Empty(t, actual, "metadata for bad key %v", k) + } + }) + } +} + +func TestGetInfoRefsProxiedToGitalyInterruptedStream(t *testing.T) { + apiResponse := gitOkBody(t) + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + apiResponse.GitalyServer.Address = gitalyAddress + + ts := testAuthServer(t, nil, nil, 200, apiResponse) + defer ts.Close() + + ws := startWorkhorseServer(ts.URL) + defer ws.Close() + + resource := "/gitlab-org/gitlab-test.git/info/refs?service=git-upload-pack" + resp, err := http.Get(ws.URL + resource) + require.NoError(t, err) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +func waitDone(t *testing.T, done chan struct{}) { + t.Helper() + select { + case <-done: + return + case <-time.After(10 * time.Second): + t.Fatal("time out waiting for gitaly handler to return") + } +} + +func TestPostReceivePackProxiedToGitalySuccessfully(t *testing.T) { + apiResponse := gitOkBody(t) + + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + apiResponse.GitalyServer.Address = "unix:" + socketPath + apiResponse.GitConfigOptions = []string{"git-config-hello=world"} + ts := testAuthServer(t, nil, nil, 200, apiResponse) + defer ts.Close() + + ws := startWorkhorseServer(ts.URL) + defer ws.Close() + + gitProtocol := "fake Git protocol" + resource := "/gitlab-org/gitlab-test.git/git-receive-pack" + resp := httpPost( + t, + ws.URL+resource, + map[string]string{ + "Content-Type": "application/x-git-receive-pack-request", + "Git-Protocol": gitProtocol, + }, + bytes.NewReader(testhelper.GitalyReceivePackResponseMock), + ) + defer resp.Body.Close() + body := string(testhelper.ReadAll(t, resp.Body)) + + split := strings.SplitN(body, "\000", 2) + require.Len(t, split, 2) + + gitalyRequest := &gitalypb.PostReceivePackRequest{} + require.NoError(t, jsonpb.UnmarshalString(split[0], gitalyRequest)) + + require.Equal(t, apiResponse.Repository.StorageName, gitalyRequest.Repository.StorageName) + require.Equal(t, apiResponse.Repository.RelativePath, gitalyRequest.Repository.RelativePath) + require.Equal(t, apiResponse.GL_ID, gitalyRequest.GlId) + require.Equal(t, apiResponse.GL_USERNAME, gitalyRequest.GlUsername) + require.Equal(t, apiResponse.GitConfigOptions, gitalyRequest.GitConfigOptions) + require.Equal(t, gitProtocol, gitalyRequest.GitProtocol) + + require.Equal(t, 200, resp.StatusCode, "POST %q", resource) + require.Equal(t, string(testhelper.GitalyReceivePackResponseMock), split[1]) + testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-git-receive-pack-result") +} + +func TestPostReceivePackProxiedToGitalyInterrupted(t *testing.T) { + apiResponse := gitOkBody(t) + + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + apiResponse.GitalyServer.Address = "unix:" + socketPath + ts := testAuthServer(t, nil, nil, 200, apiResponse) + defer ts.Close() + + ws := startWorkhorseServer(ts.URL) + defer ws.Close() + + resource := "/gitlab-org/gitlab-test.git/git-receive-pack" + resp, err := http.Post( + ws.URL+resource, + "application/x-git-receive-pack-request", + bytes.NewReader(testhelper.GitalyReceivePackResponseMock), + ) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode, "POST %q", resource) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +// ReaderFunc is an adapter to turn a conforming function into an io.Reader. +type ReaderFunc func(b []byte) (int, error) + +func (r ReaderFunc) Read(b []byte) (int, error) { return r(b) } + +func TestPostUploadPackProxiedToGitalySuccessfully(t *testing.T) { + for i, tc := range []struct { + showAllRefs bool + code codes.Code + }{ + {true, codes.OK}, + {true, codes.Unavailable}, + {false, codes.OK}, + {false, codes.Unavailable}, + } { + t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) { + apiResponse := gitOkBody(t) + apiResponse.ShowAllRefs = tc.showAllRefs + + gitalyServer, socketPath := startGitalyServer(t, tc.code) + defer gitalyServer.GracefulStop() + + apiResponse.GitalyServer.Address = "unix:" + socketPath + ts := testAuthServer(t, nil, nil, 200, apiResponse) + defer ts.Close() + + ws := startWorkhorseServer(ts.URL) + defer ws.Close() + + gitProtocol := "fake git protocol" + resource := "/gitlab-org/gitlab-test.git/git-upload-pack" + + requestReader := bytes.NewReader(testhelper.GitalyUploadPackResponseMock) + var m sync.Mutex + requestReadFinished := false + resp := httpPost( + t, + ws.URL+resource, + map[string]string{ + "Content-Type": "application/x-git-upload-pack-request", + "Git-Protocol": gitProtocol, + }, + ReaderFunc(func(b []byte) (int, error) { + n, err := requestReader.Read(b) + if err != nil { + m.Lock() + requestReadFinished = true + m.Unlock() + } + return n, err + }), + ) + defer resp.Body.Close() + require.Equal(t, 200, resp.StatusCode, "POST %q", resource) + testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-git-upload-pack-result") + + m.Lock() + requestFinished := requestReadFinished + m.Unlock() + require.True(t, requestFinished, "response written before request was fully read") + + body := string(testhelper.ReadAll(t, resp.Body)) + bodySplit := strings.SplitN(body, "\000", 2) + require.Len(t, bodySplit, 2) + + gitalyRequest := &gitalypb.PostUploadPackRequest{} + require.NoError(t, jsonpb.UnmarshalString(bodySplit[0], gitalyRequest)) + + require.Equal(t, apiResponse.Repository.StorageName, gitalyRequest.Repository.StorageName) + require.Equal(t, apiResponse.Repository.RelativePath, gitalyRequest.Repository.RelativePath) + require.Equal(t, gitProtocol, gitalyRequest.GitProtocol) + + if tc.showAllRefs { + require.Equal(t, []string{git.GitConfigShowAllRefs}, gitalyRequest.GitConfigOptions) + } else { + require.Empty(t, gitalyRequest.GitConfigOptions) + } + + require.Equal(t, string(testhelper.GitalyUploadPackResponseMock), bodySplit[1], "POST %q: response body", resource) + }) + } +} + +func TestPostUploadPackProxiedToGitalyInterrupted(t *testing.T) { + apiResponse := gitOkBody(t) + + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + apiResponse.GitalyServer.Address = "unix:" + socketPath + ts := testAuthServer(t, nil, nil, 200, apiResponse) + defer ts.Close() + + ws := startWorkhorseServer(ts.URL) + defer ws.Close() + + resource := "/gitlab-org/gitlab-test.git/git-upload-pack" + resp, err := http.Post( + ws.URL+resource, + "application/x-git-upload-pack-request", + bytes.NewReader(testhelper.GitalyUploadPackResponseMock), + ) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode, "POST %q", resource) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +func TestGetDiffProxiedToGitalySuccessfully(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + repoStorage := "default" + rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e" + leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab" + repoRelativePath := "foo/bar.git" + jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`, + gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit) + expectedBody := testhelper.GitalyGetDiffResponseMock + + resp, body, err := doSendDataRequest("/something", "git-diff", jsonParams) + require.NoError(t, err) + + require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL) + require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL) +} + +func TestGetPatchProxiedToGitalySuccessfully(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + repoStorage := "default" + rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e" + leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab" + repoRelativePath := "foo/bar.git" + jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`, + gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit) + expectedBody := testhelper.GitalyGetPatchResponseMock + + resp, body, err := doSendDataRequest("/something", "git-format-patch", jsonParams) + require.NoError(t, err) + + require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL) + require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL) +} + +func TestGetBlobProxiedToGitalyInterruptedStream(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + repoStorage := "default" + oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51" + repoRelativePath := "foo/bar.git" + jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GetBlobRequest":{"repository":{"storage_name":"%s","relative_path":"%s"},"oid":"%s","limit":-1}}`, + gitalyAddress, repoStorage, repoRelativePath, oid) + + resp, _, err := doSendDataRequest("/something", "git-blob", jsonParams) + require.NoError(t, err) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +func TestGetArchiveProxiedToGitalySuccessfully(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + repoStorage := "default" + oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51" + repoRelativePath := "foo/bar.git" + archivePrefix := "repo-1" + expectedBody := testhelper.GitalyGetArchiveResponseMock + archiveLength := len(expectedBody) + + testCases := []struct { + archivePath string + cacheDisabled bool + }{ + {archivePath: path.Join(scratchDir, "my/path"), cacheDisabled: false}, + {archivePath: "/var/empty/my/path", cacheDisabled: true}, + } + + for _, tc := range testCases { + jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GitalyRepository":{"storage_name":"%s","relative_path":"%s"},"ArchivePath":"%s","ArchivePrefix":"%s","CommitId":"%s","DisableCache":%v}`, + gitalyAddress, repoStorage, repoRelativePath, tc.archivePath, archivePrefix, oid, tc.cacheDisabled) + resp, body, err := doSendDataRequest("/archive.tar.gz", "git-archive", jsonParams) + require.NoError(t, err) + + require.Equal(t, 200, resp.StatusCode, "GET %q: status code", resp.Request.URL) + require.Equal(t, expectedBody, string(body), "GET %q: response body", resp.Request.URL) + require.Equal(t, archiveLength, len(body), "GET %q: body size", resp.Request.URL) + + if tc.cacheDisabled { + _, err := os.Stat(tc.archivePath) + require.True(t, os.IsNotExist(err), "expected 'does not exist', got: %v", err) + } else { + cachedArchive, err := ioutil.ReadFile(tc.archivePath) + require.NoError(t, err) + require.Equal(t, expectedBody, string(cachedArchive)) + } + } +} + +func TestGetArchiveProxiedToGitalyInterruptedStream(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + repoStorage := "default" + oid := "54fcc214b94e78d7a41a9a8fe6d87a5e59500e51" + repoRelativePath := "foo/bar.git" + archivePath := "my/path" + archivePrefix := "repo-1" + jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"GitalyRepository":{"storage_name":"%s","relative_path":"%s"},"ArchivePath":"%s","ArchivePrefix":"%s","CommitId":"%s"}`, + gitalyAddress, repoStorage, repoRelativePath, path.Join(scratchDir, archivePath), archivePrefix, oid) + + resp, _, err := doSendDataRequest("/archive.tar.gz", "git-archive", jsonParams) + require.NoError(t, err) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +func TestGetDiffProxiedToGitalyInterruptedStream(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + repoStorage := "default" + rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e" + leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab" + repoRelativePath := "foo/bar.git" + jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawDiffRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`, + gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit) + + resp, _, err := doSendDataRequest("/something", "git-diff", jsonParams) + require.NoError(t, err) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +func TestGetPatchProxiedToGitalyInterruptedStream(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + repoStorage := "default" + rightCommit := "e395f646b1499e8e0279445fc99a0596a65fab7e" + leftCommit := "8a0f2ee90d940bfb0ba1e14e8214b0649056e4ab" + repoRelativePath := "foo/bar.git" + jsonParams := fmt.Sprintf(`{"GitalyServer":{"Address":"%s","Token":""},"RawPatchRequest":"{\"repository\":{\"storageName\":\"%s\",\"relativePath\":\"%s\"},\"rightCommitId\":\"%s\",\"leftCommitId\":\"%s\"}"}`, + gitalyAddress, repoStorage, repoRelativePath, leftCommit, rightCommit) + + resp, _, err := doSendDataRequest("/something", "git-format-patch", jsonParams) + require.NoError(t, err) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +func TestGetSnapshotProxiedToGitalySuccessfully(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + expectedBody := testhelper.GitalyGetSnapshotResponseMock + archiveLength := len(expectedBody) + + params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git")) + resp, body, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, resp.StatusCode, "GET %q: status code", resp.Request.URL) + require.Equal(t, expectedBody, string(body), "GET %q: body", resp.Request.URL) + require.Equal(t, archiveLength, len(body), "GET %q: body size", resp.Request.URL) + + testhelper.RequireResponseHeader(t, resp, "Content-Disposition", `attachment; filename="snapshot.tar"`) + testhelper.RequireResponseHeader(t, resp, "Content-Type", "application/x-tar") + testhelper.RequireResponseHeader(t, resp, "Content-Transfer-Encoding", "binary") + testhelper.RequireResponseHeader(t, resp, "Cache-Control", "private") +} + +func TestGetSnapshotProxiedToGitalyInterruptedStream(t *testing.T) { + gitalyServer, socketPath := startGitalyServer(t, codes.OK) + defer gitalyServer.GracefulStop() + + gitalyAddress := "unix:" + socketPath + + params := buildGetSnapshotParams(gitalyAddress, buildPbRepo("default", "foo/bar.git")) + resp, _, err := doSendDataRequest("/api/v4/projects/:id/snapshot", "git-snapshot", params) + require.NoError(t, err) + + // This causes the server stream to be interrupted instead of consumed entirely. + resp.Body.Close() + + done := make(chan struct{}) + go func() { + gitalyServer.WaitGroup.Wait() + close(done) + }() + + waitDone(t, done) +} + +func buildGetSnapshotParams(gitalyAddress string, repo *gitalypb.Repository) string { + msg := serializedMessage("GetSnapshotRequest", &gitalypb.GetSnapshotRequest{Repository: repo}) + return buildGitalyRPCParams(gitalyAddress, msg) +} + +type rpcArg struct { + k string + v interface{} +} + +// Gitlab asks workhorse to perform some long-running RPCs for it by sending +// the RPC arguments (which are protobuf messages) in HTTP response headers. +// The messages are encoded to JSON objects using pbjson, The strings are then +// re-encoded to JSON strings using json. We must replicate this behaviour here +func buildGitalyRPCParams(gitalyAddress string, rpcArgs ...rpcArg) string { + built := map[string]interface{}{ + "GitalyServer": map[string]string{ + "Address": gitalyAddress, + "Token": "", + }, + } + + for _, arg := range rpcArgs { + built[arg.k] = arg.v + } + + b, err := json.Marshal(interface{}(built)) + if err != nil { + panic(err) + } + + return string(b) +} + +func buildPbRepo(storageName, relativePath string) *gitalypb.Repository { + return &gitalypb.Repository{ + StorageName: storageName, + RelativePath: relativePath, + } +} + +func serializedMessage(name string, arg proto.Message) rpcArg { + m := &jsonpb.Marshaler{} + str, err := m.MarshalToString(arg) + if err != nil { + panic(err) + } + + return rpcArg{name, str} +} + +func serializedProtoMessage(name string, arg proto.Message) rpcArg { + msg, err := proto.Marshal(arg) + + if err != nil { + panic(err) + } + + return rpcArg{name, base64.URLEncoding.EncodeToString(msg)} +} + +type combinedServer struct { + *grpc.Server + *testhelper.GitalyTestServer +} + +func startGitalyServer(t *testing.T, finalMessageCode codes.Code) (*combinedServer, string) { + socketPath := path.Join(scratchDir, fmt.Sprintf("gitaly-%d.sock", rand.Int())) + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + t.Fatal(err) + } + server := grpc.NewServer() + listener, err := net.Listen("unix", socketPath) + require.NoError(t, err) + + gitalyServer := testhelper.NewGitalyServer(finalMessageCode) + gitalypb.RegisterSmartHTTPServiceServer(server, gitalyServer) + gitalypb.RegisterBlobServiceServer(server, gitalyServer) + gitalypb.RegisterRepositoryServiceServer(server, gitalyServer) + gitalypb.RegisterDiffServiceServer(server, gitalyServer) + + go server.Serve(listener) + + return &combinedServer{Server: server, GitalyTestServer: gitalyServer}, socketPath +} |