Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-02-13 23:59:23 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-02-13 23:59:23 +0300
commitc45819965cea6f4e7e0465246f3fad2a797b07ab (patch)
treed6f2aa795ba839577522c324a8450ae44ad60176
parenta65d9ddfd3e7b12daff0ffa4252df8ce980a7e6f (diff)
parentecb050ee69b5363a2cb46777aac1306cf7daf8ea (diff)
Merge branch 'pass_thru_proxy' into 'master'
Pass thru proxy Closes #1472 See merge request gitlab-org/gitaly!1064
-rw-r--r--NOTICE175
-rw-r--r--changelogs/unreleased/ha_praefect_pass_thru.yml5
-rw-r--r--internal/praefect/reposvc_test.go150
-rw-r--r--internal/praefect/server.go130
-rw-r--r--internal/praefect/server_test.go113
-rw-r--r--vendor/github.com/mwitkow/grpc-proxy/LICENSE.txt174
-rw-r--r--vendor/github.com/mwitkow/grpc-proxy/proxy/DOC.md83
-rw-r--r--vendor/github.com/mwitkow/grpc-proxy/proxy/README.md83
-rw-r--r--vendor/github.com/mwitkow/grpc-proxy/proxy/codec.go70
-rw-r--r--vendor/github.com/mwitkow/grpc-proxy/proxy/director.go24
-rw-r--r--vendor/github.com/mwitkow/grpc-proxy/proxy/doc.go15
-rw-r--r--vendor/github.com/mwitkow/grpc-proxy/proxy/handler.go162
-rw-r--r--vendor/vendor.json22
13 files changed, 1198 insertions, 8 deletions
diff --git a/NOTICE b/NOTICE
index bb67bf9c4..f8cbd6d4d 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1162,6 +1162,181 @@ LICENSE - gitlab.com/gitlab-org/gitaly/vendor/github.com/matttproud/golang_proto
NOTICE - gitlab.com/gitlab-org/gitaly/vendor/github.com/matttproud/golang_protobuf_extensions
Copyright 2012 Matt T. Proud (matt.proud@gmail.com)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+LICENSE.txt - gitlab.com/gitlab-org/gitaly/vendor/github.com/mwitkow/grpc-proxy
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - gitlab.com/gitlab-org/gitaly/vendor/github.com/opentracing/opentracing-go
Apache License
Version 2.0, January 2004
diff --git a/changelogs/unreleased/ha_praefect_pass_thru.yml b/changelogs/unreleased/ha_praefect_pass_thru.yml
new file mode 100644
index 000000000..1b908d623
--- /dev/null
+++ b/changelogs/unreleased/ha_praefect_pass_thru.yml
@@ -0,0 +1,5 @@
+---
+title: Reverse proxy pass thru for HA
+merge_request: 1064
+author:
+type: other
diff --git a/internal/praefect/reposvc_test.go b/internal/praefect/reposvc_test.go
new file mode 100644
index 000000000..22ec3480b
--- /dev/null
+++ b/internal/praefect/reposvc_test.go
@@ -0,0 +1,150 @@
+package praefect_test
+
+import (
+ "context"
+
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+// mockRepoSvc is a mock implementation of gitalypb.RepositoryServiceServer
+// for testing purposes
+type mockRepoSvc struct {
+ srv *grpc.Server
+}
+
+func (m *mockRepoSvc) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) {
+ return &gitalypb.RepositoryExistsResponse{}, nil
+}
+
+func (m *mockRepoSvc) RepackIncremental(context.Context, *gitalypb.RepackIncrementalRequest) (*gitalypb.RepackIncrementalResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) RepackFull(context.Context, *gitalypb.RepackFullRequest) (*gitalypb.RepackFullResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) GarbageCollect(context.Context, *gitalypb.GarbageCollectRequest) (*gitalypb.GarbageCollectResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) RepositorySize(context.Context, *gitalypb.RepositorySizeRequest) (*gitalypb.RepositorySizeResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) ApplyGitattributes(context.Context, *gitalypb.ApplyGitattributesRequest) (*gitalypb.ApplyGitattributesResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) FetchRemote(context.Context, *gitalypb.FetchRemoteRequest) (*gitalypb.FetchRemoteResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) CreateRepository(context.Context, *gitalypb.CreateRepositoryRequest) (*gitalypb.CreateRepositoryResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) GetArchive(*gitalypb.GetArchiveRequest, gitalypb.RepositoryService_GetArchiveServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) HasLocalBranches(context.Context, *gitalypb.HasLocalBranchesRequest) (*gitalypb.HasLocalBranchesResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) FetchSourceBranch(context.Context, *gitalypb.FetchSourceBranchRequest) (*gitalypb.FetchSourceBranchResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) Fsck(context.Context, *gitalypb.FsckRequest) (*gitalypb.FsckResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) WriteRef(context.Context, *gitalypb.WriteRefRequest) (*gitalypb.WriteRefResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) FindMergeBase(context.Context, *gitalypb.FindMergeBaseRequest) (*gitalypb.FindMergeBaseResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) CreateFork(context.Context, *gitalypb.CreateForkRequest) (*gitalypb.CreateForkResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) IsRebaseInProgress(context.Context, *gitalypb.IsRebaseInProgressRequest) (*gitalypb.IsRebaseInProgressResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) IsSquashInProgress(context.Context, *gitalypb.IsSquashInProgressRequest) (*gitalypb.IsSquashInProgressResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) CreateRepositoryFromURL(context.Context, *gitalypb.CreateRepositoryFromURLRequest) (*gitalypb.CreateRepositoryFromURLResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) CreateBundle(*gitalypb.CreateBundleRequest, gitalypb.RepositoryService_CreateBundleServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) CreateRepositoryFromBundle(gitalypb.RepositoryService_CreateRepositoryFromBundleServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) WriteConfig(context.Context, *gitalypb.WriteConfigRequest) (*gitalypb.WriteConfigResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) SetConfig(context.Context, *gitalypb.SetConfigRequest) (*gitalypb.SetConfigResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) DeleteConfig(context.Context, *gitalypb.DeleteConfigRequest) (*gitalypb.DeleteConfigResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) FindLicense(context.Context, *gitalypb.FindLicenseRequest) (*gitalypb.FindLicenseResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) GetInfoAttributes(*gitalypb.GetInfoAttributesRequest, gitalypb.RepositoryService_GetInfoAttributesServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) CalculateChecksum(context.Context, *gitalypb.CalculateChecksumRequest) (*gitalypb.CalculateChecksumResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) Cleanup(context.Context, *gitalypb.CleanupRequest) (*gitalypb.CleanupResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) GetSnapshot(*gitalypb.GetSnapshotRequest, gitalypb.RepositoryService_GetSnapshotServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) CreateRepositoryFromSnapshot(context.Context, *gitalypb.CreateRepositoryFromSnapshotRequest) (*gitalypb.CreateRepositoryFromSnapshotResponse, error) {
+ return nil, nil
+}
+
+func (m *mockRepoSvc) GetRawChanges(*gitalypb.GetRawChangesRequest, gitalypb.RepositoryService_GetRawChangesServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) SearchFilesByContent(*gitalypb.SearchFilesByContentRequest, gitalypb.RepositoryService_SearchFilesByContentServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) SearchFilesByName(*gitalypb.SearchFilesByNameRequest, gitalypb.RepositoryService_SearchFilesByNameServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) RestoreCustomHooks(gitalypb.RepositoryService_RestoreCustomHooksServer) error {
+ return nil
+}
+
+func (m *mockRepoSvc) BackupCustomHooks(*gitalypb.BackupCustomHooksRequest, gitalypb.RepositoryService_BackupCustomHooksServer) error {
+ return nil
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
new file mode 100644
index 000000000..e432f7544
--- /dev/null
+++ b/internal/praefect/server.go
@@ -0,0 +1,130 @@
+/*Package praefect is a Gitaly reverse proxy for transparently routing gRPC
+calls to a set of Gitaly services.*/
+package praefect
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+ "sync"
+
+ "github.com/mwitkow/grpc-proxy/proxy"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Logger is a simple interface that allows loggers to be dependency injected
+// into the praefect server
+type Logger interface {
+ Debugf(format string, args ...interface{})
+}
+
+// Coordinator takes care of directing client requests to the appropriate
+// downstream server. The coordinator is thread safe; concurrent calls to
+// register nodes are safe.
+type Coordinator struct {
+ log Logger
+ lock sync.RWMutex
+ nodes map[string]*grpc.ClientConn
+}
+
+// newCoordinator returns a new Coordinator that utilizes the provided logger
+func newCoordinator(l Logger) *Coordinator {
+ return &Coordinator{
+ log: l,
+ nodes: make(map[string]*grpc.ClientConn),
+ }
+}
+
+// streamDirector determines which downstream servers receive requests
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
+ // For phase 1, we need to route messages based on the storage location
+ // to the appropriate Gitaly node.
+ c.log.Debugf("Stream director received method %s", fullMethodName)
+
+ // TODO: obtain storage location dynamically from RPC request message
+ storageLoc := "test"
+
+ c.lock.RLock()
+ cc, ok := c.nodes[storageLoc]
+ c.lock.RUnlock()
+
+ if !ok {
+ err := status.Error(
+ codes.FailedPrecondition,
+ fmt.Sprintf("no downstream node for storage location %q", storageLoc),
+ )
+ return nil, nil, err
+ }
+
+ return ctx, cc, nil
+}
+
+// Server is a praefect server
+type Server struct {
+ *Coordinator
+ s *grpc.Server
+}
+
+// NewServer returns an initialized praefect gPRC proxy server configured
+// with the provided gRPC server options
+func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server {
+ c := newCoordinator(l)
+ grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
+
+ return &Server{
+ s: grpc.NewServer(grpcOpts...),
+ Coordinator: c,
+ }
+}
+
+// ErrStorageLocExists indicates a storage location has already been registered
+// in the proxy for a downstream Gitaly node
+var ErrStorageLocExists = errors.New("storage location already registered")
+
+// RegisterNode will direct traffic to the supplied downstream connection when the storage location
+// is encountered.
+//
+// TODO: Coordinator probably needs to handle dialing, or another entity
+// needs to handle dialing to ensure keep alives and redialing logic
+// exist for when downstream connections are severed.
+func (c *Coordinator) RegisterNode(storageLoc string, node *grpc.ClientConn) {
+ c.lock.Lock()
+ c.nodes[storageLoc] = node
+ c.lock.Unlock()
+}
+
+func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
+ return []grpc.ServerOption{
+ grpc.CustomCodec(proxy.Codec()),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ }
+}
+
+// Start will start the praefect gRPC proxy server listening at the provided
+// listener. Function will block until the server is stopped or an
+// unrecoverable error occurs.
+func (srv *Server) Start(lis net.Listener) error {
+ return srv.s.Serve(lis)
+}
+
+// Shutdown will attempt a graceful shutdown of the grpc server. If unable
+// to gracefully shutdown within the context deadline, it will then
+// forcefully shutdown the server and return a context cancellation error.
+func (srv *Server) Shutdown(ctx context.Context) error {
+ done := make(chan struct{})
+ go func() {
+ srv.s.GracefulStop()
+ close(done)
+ }()
+
+ select {
+ case <-ctx.Done():
+ srv.s.Stop()
+ return ctx.Err()
+ case <-done:
+ return nil
+ }
+}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
new file mode 100644
index 000000000..a079bcb58
--- /dev/null
+++ b/internal/praefect/server_test.go
@@ -0,0 +1,113 @@
+package praefect_test
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "testing"
+ "time"
+
+ "github.com/mwitkow/grpc-proxy/proxy"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect"
+ "google.golang.org/grpc"
+)
+
+func TestServerRouting(t *testing.T) {
+ prf := praefect.NewServer(nil, testLogger{t})
+
+ listener, port := listenAvailPort(t)
+ t.Logf("proxy listening on port %d", port)
+ defer listener.Close()
+
+ errQ := make(chan error)
+
+ go func() {
+ errQ <- prf.Start(listener)
+ }()
+
+ // dial client to proxy
+ cc := dialLocalPort(t, port, false)
+ defer cc.Close()
+ gCli := gitalypb.NewRepositoryServiceClient(cc)
+
+ mCli, _, cleanup := newMockDownstream(t)
+ defer cleanup() // clean up mock downstream server resources
+
+ prf.RegisterNode("test", mCli)
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ _, err := gCli.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{})
+ require.NoError(t, err)
+
+ err = prf.Shutdown(ctx)
+ require.NoError(t, err)
+ require.NoError(t, <-errQ)
+}
+
+func listenAvailPort(tb testing.TB) (net.Listener, int) {
+ listener, err := net.Listen("tcp", ":0")
+ require.NoError(tb, err)
+
+ return listener, listener.Addr().(*net.TCPAddr).Port
+}
+
+func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
+ opts := []grpc.DialOption{
+ grpc.WithBlock(),
+ }
+ if backend {
+ opts = append(
+ opts,
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ )
+ }
+
+ cc, err := client.Dial(
+ fmt.Sprintf("tcp://localhost:%d", port),
+ opts,
+ )
+ require.NoError(tb, err)
+
+ return cc
+}
+
+type testLogger struct {
+ testing.TB
+}
+
+func (tl testLogger) Debugf(format string, args ...interface{}) {
+ tl.TB.Logf(format, args...)
+}
+
+// initializes and returns a client to downstream server, downstream server, and cleanup function
+func newMockDownstream(tb testing.TB) (*grpc.ClientConn, gitalypb.RepositoryServiceServer, func()) {
+ // setup mock server
+ m := &mockRepoSvc{
+ srv: grpc.NewServer(),
+ }
+ gitalypb.RegisterRepositoryServiceServer(m.srv, m)
+ lis, port := listenAvailPort(tb)
+
+ // dial praefect to backend service
+ cc := dialLocalPort(tb, port, true)
+
+ errQ := make(chan error)
+
+ go func() {
+ errQ <- m.srv.Serve(lis)
+ }()
+
+ cleanup := func() {
+ m.srv.GracefulStop()
+ lis.Close()
+ cc.Close()
+ require.NoError(tb, <-errQ)
+ }
+
+ return cc, m, cleanup
+}
diff --git a/vendor/github.com/mwitkow/grpc-proxy/LICENSE.txt b/vendor/github.com/mwitkow/grpc-proxy/LICENSE.txt
new file mode 100644
index 000000000..cbfdef8c5
--- /dev/null
+++ b/vendor/github.com/mwitkow/grpc-proxy/LICENSE.txt
@@ -0,0 +1,174 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability. \ No newline at end of file
diff --git a/vendor/github.com/mwitkow/grpc-proxy/proxy/DOC.md b/vendor/github.com/mwitkow/grpc-proxy/proxy/DOC.md
new file mode 100644
index 000000000..85c411a10
--- /dev/null
+++ b/vendor/github.com/mwitkow/grpc-proxy/proxy/DOC.md
@@ -0,0 +1,83 @@
+# proxy
+--
+ import "github.com/mwitkow/grpc-proxy/proxy"
+
+Package proxy provides a reverse proxy handler for gRPC.
+
+The implementation allows a `grpc.Server` to pass a received ServerStream to a
+ClientStream without understanding the semantics of the messages exchanged. It
+basically provides a transparent reverse-proxy.
+
+This package is intentionally generic, exposing a `StreamDirector` function that
+allows users of this package to implement whatever logic of backend-picking,
+dialing and service verification to perform.
+
+See examples on documented functions.
+
+## Usage
+
+#### func Codec
+
+```go
+func Codec() grpc.Codec
+```
+Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
+
+See CodecWithParent.
+
+#### func CodecWithParent
+
+```go
+func CodecWithParent(fallback grpc.Codec) grpc.Codec
+```
+CodecWithParent returns a proxying grpc.Codec with a user provided codec as
+parent.
+
+This codec is *crucial* to the functioning of the proxy. It allows the proxy
+server to be oblivious to the schema of the forwarded messages. It basically
+treats a gRPC message frame as raw bytes. However, if the server handler, or the
+client caller are not proxy-internal functions it will fall back to trying to
+decode the message using a fallback codec.
+
+#### func RegisterService
+
+```go
+func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string)
+```
+RegisterService sets up a proxy handler for a particular gRPC service and
+method. The behaviour is the same as if you were registering a handler method,
+e.g. from a codegenerated pb.go file.
+
+This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
+ServerOption.
+
+#### func TransparentHandler
+
+```go
+func TransparentHandler(director StreamDirector) grpc.StreamHandler
+```
+TransparentHandler returns a handler that attempts to proxy all requests that
+are not registered in the server. The indented use here is as a transparent
+proxy, where the server doesn't know about the services implemented by the
+backends. It should be used as a `grpc.UnknownServiceHandler`.
+
+This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
+ServerOption.
+
+#### type StreamDirector
+
+```go
+type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error)
+```
+
+StreamDirector returns a gRPC ClientConn to be used to forward the call to.
+
+The presence of the `Context` allows for rich filtering, e.g. based on Metadata
+(headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC
+error should be returned.
+
+It is worth noting that the StreamDirector will be fired *after* all server-side
+stream interceptors are invoked. So decisions around authorization, monitoring
+etc. are better to be handled there.
+
+See the rather rich example.
diff --git a/vendor/github.com/mwitkow/grpc-proxy/proxy/README.md b/vendor/github.com/mwitkow/grpc-proxy/proxy/README.md
new file mode 100644
index 000000000..85c411a10
--- /dev/null
+++ b/vendor/github.com/mwitkow/grpc-proxy/proxy/README.md
@@ -0,0 +1,83 @@
+# proxy
+--
+ import "github.com/mwitkow/grpc-proxy/proxy"
+
+Package proxy provides a reverse proxy handler for gRPC.
+
+The implementation allows a `grpc.Server` to pass a received ServerStream to a
+ClientStream without understanding the semantics of the messages exchanged. It
+basically provides a transparent reverse-proxy.
+
+This package is intentionally generic, exposing a `StreamDirector` function that
+allows users of this package to implement whatever logic of backend-picking,
+dialing and service verification to perform.
+
+See examples on documented functions.
+
+## Usage
+
+#### func Codec
+
+```go
+func Codec() grpc.Codec
+```
+Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
+
+See CodecWithParent.
+
+#### func CodecWithParent
+
+```go
+func CodecWithParent(fallback grpc.Codec) grpc.Codec
+```
+CodecWithParent returns a proxying grpc.Codec with a user provided codec as
+parent.
+
+This codec is *crucial* to the functioning of the proxy. It allows the proxy
+server to be oblivious to the schema of the forwarded messages. It basically
+treats a gRPC message frame as raw bytes. However, if the server handler, or the
+client caller are not proxy-internal functions it will fall back to trying to
+decode the message using a fallback codec.
+
+#### func RegisterService
+
+```go
+func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string)
+```
+RegisterService sets up a proxy handler for a particular gRPC service and
+method. The behaviour is the same as if you were registering a handler method,
+e.g. from a codegenerated pb.go file.
+
+This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
+ServerOption.
+
+#### func TransparentHandler
+
+```go
+func TransparentHandler(director StreamDirector) grpc.StreamHandler
+```
+TransparentHandler returns a handler that attempts to proxy all requests that
+are not registered in the server. The indented use here is as a transparent
+proxy, where the server doesn't know about the services implemented by the
+backends. It should be used as a `grpc.UnknownServiceHandler`.
+
+This can *only* be used if the `server` also uses grpcproxy.CodecForServer()
+ServerOption.
+
+#### type StreamDirector
+
+```go
+type StreamDirector func(ctx context.Context, fullMethodName string) (*grpc.ClientConn, error)
+```
+
+StreamDirector returns a gRPC ClientConn to be used to forward the call to.
+
+The presence of the `Context` allows for rich filtering, e.g. based on Metadata
+(headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC
+error should be returned.
+
+It is worth noting that the StreamDirector will be fired *after* all server-side
+stream interceptors are invoked. So decisions around authorization, monitoring
+etc. are better to be handled there.
+
+See the rather rich example.
diff --git a/vendor/github.com/mwitkow/grpc-proxy/proxy/codec.go b/vendor/github.com/mwitkow/grpc-proxy/proxy/codec.go
new file mode 100644
index 000000000..846b9c4eb
--- /dev/null
+++ b/vendor/github.com/mwitkow/grpc-proxy/proxy/codec.go
@@ -0,0 +1,70 @@
+package proxy
+
+import (
+ "fmt"
+
+ "github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
+)
+
+// Codec returns a proxying grpc.Codec with the default protobuf codec as parent.
+//
+// See CodecWithParent.
+func Codec() grpc.Codec {
+ return CodecWithParent(&protoCodec{})
+}
+
+// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent.
+//
+// This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious
+// to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes.
+// However, if the server handler, or the client caller are not proxy-internal functions it will fall back
+// to trying to decode the message using a fallback codec.
+func CodecWithParent(fallback grpc.Codec) grpc.Codec {
+ return &rawCodec{fallback}
+}
+
+type rawCodec struct {
+ parentCodec grpc.Codec
+}
+
+type frame struct {
+ payload []byte
+}
+
+func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {
+ out, ok := v.(*frame)
+ if !ok {
+ return c.parentCodec.Marshal(v)
+ }
+ return out.payload, nil
+
+}
+
+func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {
+ dst, ok := v.(*frame)
+ if !ok {
+ return c.parentCodec.Unmarshal(data, v)
+ }
+ dst.payload = data
+ return nil
+}
+
+func (c *rawCodec) String() string {
+ return fmt.Sprintf("proxy>%s", c.parentCodec.String())
+}
+
+// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC.
+type protoCodec struct{}
+
+func (protoCodec) Marshal(v interface{}) ([]byte, error) {
+ return proto.Marshal(v.(proto.Message))
+}
+
+func (protoCodec) Unmarshal(data []byte, v interface{}) error {
+ return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (protoCodec) String() string {
+ return "proto"
+}
diff --git a/vendor/github.com/mwitkow/grpc-proxy/proxy/director.go b/vendor/github.com/mwitkow/grpc-proxy/proxy/director.go
new file mode 100644
index 000000000..371ca6050
--- /dev/null
+++ b/vendor/github.com/mwitkow/grpc-proxy/proxy/director.go
@@ -0,0 +1,24 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package proxy
+
+import (
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+)
+
+// StreamDirector returns a gRPC ClientConn to be used to forward the call to.
+//
+// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers).
+// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.
+//
+// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want
+// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you
+// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.
+//
+// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors
+// are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
+//
+// See the rather rich example.
+type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error)
diff --git a/vendor/github.com/mwitkow/grpc-proxy/proxy/doc.go b/vendor/github.com/mwitkow/grpc-proxy/proxy/doc.go
new file mode 100644
index 000000000..01328f332
--- /dev/null
+++ b/vendor/github.com/mwitkow/grpc-proxy/proxy/doc.go
@@ -0,0 +1,15 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+/*
+Package proxy provides a reverse proxy handler for gRPC.
+
+The implementation allows a `grpc.Server` to pass a received ServerStream to a ClientStream without understanding
+the semantics of the messages exchanged. It basically provides a transparent reverse-proxy.
+
+This package is intentionally generic, exposing a `StreamDirector` function that allows users of this package
+to implement whatever logic of backend-picking, dialing and service verification to perform.
+
+See examples on documented functions.
+*/
+package proxy
diff --git a/vendor/github.com/mwitkow/grpc-proxy/proxy/handler.go b/vendor/github.com/mwitkow/grpc-proxy/proxy/handler.go
new file mode 100644
index 000000000..752f892a1
--- /dev/null
+++ b/vendor/github.com/mwitkow/grpc-proxy/proxy/handler.go
@@ -0,0 +1,162 @@
+// Copyright 2017 Michal Witkowski. All Rights Reserved.
+// See LICENSE for licensing terms.
+
+package proxy
+
+import (
+ "io"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+)
+
+var (
+ clientStreamDescForProxying = &grpc.StreamDesc{
+ ServerStreams: true,
+ ClientStreams: true,
+ }
+)
+
+// RegisterService sets up a proxy handler for a particular gRPC service and method.
+// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
+//
+// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
+func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
+ streamer := &handler{director}
+ fakeDesc := &grpc.ServiceDesc{
+ ServiceName: serviceName,
+ HandlerType: (*interface{})(nil),
+ }
+ for _, m := range methodNames {
+ streamDesc := grpc.StreamDesc{
+ StreamName: m,
+ Handler: streamer.handler,
+ ServerStreams: true,
+ ClientStreams: true,
+ }
+ fakeDesc.Streams = append(fakeDesc.Streams, streamDesc)
+ }
+ server.RegisterService(fakeDesc, streamer)
+}
+
+// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server.
+// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the
+// backends. It should be used as a `grpc.UnknownServiceHandler`.
+//
+// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
+func TransparentHandler(director StreamDirector) grpc.StreamHandler {
+ streamer := &handler{director}
+ return streamer.handler
+}
+
+type handler struct {
+ director StreamDirector
+}
+
+// handler is where the real magic of proxying happens.
+// It is invoked like any gRPC server stream and uses the gRPC server framing to get and receive bytes from the wire,
+// forwarding it to a ClientStream established against the relevant ClientConn.
+func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {
+ // little bit of gRPC internals never hurt anyone
+ fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
+ if !ok {
+ return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
+ }
+ // We require that the director's returned context inherits from the serverStream.Context().
+ outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)
+ if err != nil {
+ return err
+ }
+
+ clientCtx, clientCancel := context.WithCancel(outgoingCtx)
+ // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
+ clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
+ if err != nil {
+ return err
+ }
+ // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
+ // Channels do not have to be closed, it is just a control flow mechanism, see
+ // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
+ s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
+ c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
+ // We don't know which side is going to stop sending first, so we need a select between the two.
+ for i := 0; i < 2; i++ {
+ select {
+ case s2cErr := <-s2cErrChan:
+ if s2cErr == io.EOF {
+ // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
+ // the clientStream>serverStream may continue pumping though.
+ clientStream.CloseSend()
+ break
+ } else {
+ // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
+ // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
+ // exit with an error to the stack
+ clientCancel()
+ return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
+ }
+ case c2sErr := <-c2sErrChan:
+ // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
+ // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
+ // will be nil.
+ serverStream.SetTrailer(clientStream.Trailer())
+ // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
+ if c2sErr != io.EOF {
+ return c2sErr
+ }
+ return nil
+ }
+ }
+ return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+}
+
+func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
+ ret := make(chan error, 1)
+ go func() {
+ f := &frame{}
+ for i := 0; ; i++ {
+ if err := src.RecvMsg(f); err != nil {
+ ret <- err // this can be io.EOF which is happy case
+ break
+ }
+ if i == 0 {
+ // This is a bit of a hack, but client to server headers are only readable after first client msg is
+ // received but must be written to server stream before the first msg is flushed.
+ // This is the only place to do it nicely.
+ md, err := src.Header()
+ if err != nil {
+ ret <- err
+ break
+ }
+ if err := dst.SendHeader(md); err != nil {
+ ret <- err
+ break
+ }
+ }
+ if err := dst.SendMsg(f); err != nil {
+ ret <- err
+ break
+ }
+ }
+ }()
+ return ret
+}
+
+func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {
+ ret := make(chan error, 1)
+ go func() {
+ f := &frame{}
+ for i := 0; ; i++ {
+ if err := src.RecvMsg(f); err != nil {
+ ret <- err // this can be io.EOF which is happy case
+ break
+ }
+ if err := dst.SendMsg(f); err != nil {
+ ret <- err
+ break
+ }
+ }
+ }()
+ return ret
+}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index c380fbe79..5042c5793 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -229,6 +229,12 @@
"versionExact": "v1.0.1"
},
{
+ "checksumSHA1": "603wOI+hKduL75jXGGxfLKP5GE4=",
+ "path": "github.com/mwitkow/grpc-proxy/proxy",
+ "revision": "0f1106ef9c766333b9acb4b81e705da4bade7215",
+ "revisionTime": "2018-10-17T16:41:39Z"
+ },
+ {
"checksumSHA1": "jqEjDv//suCrQUg8iOGI7oxwfRU=",
"path": "github.com/opentracing/opentracing-go",
"revision": "be550b025b433cdfa2f11efb962afa2ea3c4d967",
@@ -505,20 +511,20 @@
{
"checksumSHA1": "R6fNN36q3UydLODupK0vdx9h/CY=",
"path": "gitlab.com/gitlab-org/labkit/correlation",
- "revision": "2a3e1f5415a890402696dcbb2c31b5a79c17b579",
- "revisionTime": "2019-01-08T10:46:58Z",
- "version": "master",
- "versionExact": "master"
- },
- {
- "checksumSHA1": "R6fNN36q3UydLODupK0vdx9h/CY=",
- "path": "gitlab.com/gitlab-org/labkit/correlation",
"revision": "9c613f6d8258a1b55701079bf9b358147aea139e",
"revisionTime": "2018-12-20T16:08:28Z",
"version": "=d-tracing",
"versionExact": "d-tracing"
},
{
+ "checksumSHA1": "R6fNN36q3UydLODupK0vdx9h/CY=",
+ "path": "gitlab.com/gitlab-org/labkit/correlation",
+ "revision": "2a3e1f5415a890402696dcbb2c31b5a79c17b579",
+ "revisionTime": "2019-01-08T10:46:58Z",
+ "version": "master",
+ "versionExact": "master"
+ },
+ {
"checksumSHA1": "UFBFulprWZHuL9GHhjCKoHXm+Ww=",
"path": "gitlab.com/gitlab-org/labkit/correlation/grpc",
"revision": "2a3e1f5415a890402696dcbb2c31b5a79c17b579",