diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2017-09-06 19:54:03 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2017-09-06 20:03:27 +0300 |
commit | e0d71a6c02d8ddfc76fd318103aa3c5029799013 (patch) | |
tree | 1dc51f9872835efb4a8004011cfb35ad192e702a | |
parent | 0732e00055733ef7f905bca88a062d07ad14d790 (diff) |
Re-use gitaly-ruby client connection
-rw-r--r-- | cmd/gitaly/main.go | 2 | ||||
-rw-r--r-- | internal/rubyserver/rubyserver.go | 50 | ||||
-rw-r--r-- | internal/server/auth_test.go | 2 | ||||
-rw-r--r-- | internal/server/server.go | 5 | ||||
-rw-r--r-- | internal/service/commit/find_commits.go | 4 | ||||
-rw-r--r-- | internal/service/commit/server.go | 9 | ||||
-rw-r--r-- | internal/service/commit/stats.go | 4 | ||||
-rw-r--r-- | internal/service/commit/testhelper_test.go | 9 | ||||
-rw-r--r-- | internal/service/diff/patch.go | 4 | ||||
-rw-r--r-- | internal/service/diff/server.go | 9 | ||||
-rw-r--r-- | internal/service/diff/testhelper_test.go | 9 | ||||
-rw-r--r-- | internal/service/ref/branches.go | 6 | ||||
-rw-r--r-- | internal/service/ref/server.go | 9 | ||||
-rw-r--r-- | internal/service/ref/testhelper_test.go | 8 | ||||
-rw-r--r-- | internal/service/register.go | 9 |
15 files changed, 97 insertions, 42 deletions
diff --git a/cmd/gitaly/main.go b/cmd/gitaly/main.go index f6c1221b9..588bd176f 100644 --- a/cmd/gitaly/main.go +++ b/cmd/gitaly/main.go @@ -166,7 +166,7 @@ func run(listeners []net.Listener) error { } defer ruby.Stop() - server := server.New() + server := server.New(ruby) defer server.Stop() serverErrors := make(chan error, len(listeners)) diff --git a/internal/rubyserver/rubyserver.go b/internal/rubyserver/rubyserver.go index 3477bcc47..363e443e9 100644 --- a/internal/rubyserver/rubyserver.go +++ b/internal/rubyserver/rubyserver.go @@ -70,11 +70,19 @@ func socketPath() string { // Server represents a gitaly-ruby helper process. type Server struct { *supervisor.Process + clientConnMu sync.RWMutex + clientConn *grpc.ClientConn } // Stop shuts down the gitaly-ruby helper process and cleans up resources. func (s *Server) Stop() { if s != nil { + s.clientConnMu.RLock() + defer s.clientConnMu.RUnlock() + if s.clientConn != nil { + s.clientConn.Close() + } + if s.Process != nil { s.Process.Stop() } @@ -99,31 +107,57 @@ func Start() (*Server, error) { // CommitServiceClient returns a CommitServiceClient instance that is // configured to connect to the running Ruby server. This assumes Start() // has been called already. -func CommitServiceClient(ctx context.Context) (pb.CommitServiceClient, error) { - conn, err := newConnection(ctx) +func (s *Server) CommitServiceClient(ctx context.Context) (pb.CommitServiceClient, error) { + conn, err := s.getConnection(ctx) return pb.NewCommitServiceClient(conn), err } // DiffServiceClient returns a DiffServiceClient instance that is // configured to connect to the running Ruby server. This assumes Start() // has been called already. -func DiffServiceClient(ctx context.Context) (pb.DiffServiceClient, error) { - conn, err := newConnection(ctx) +func (s *Server) DiffServiceClient(ctx context.Context) (pb.DiffServiceClient, error) { + conn, err := s.getConnection(ctx) return pb.NewDiffServiceClient(conn), err } // RefServiceClient returns a RefServiceClient instance that is // configured to connect to the running Ruby server. This assumes Start() // has been called already. -func RefServiceClient(ctx context.Context) (pb.RefServiceClient, error) { - conn, err := newConnection(ctx) +func (s *Server) RefServiceClient(ctx context.Context) (pb.RefServiceClient, error) { + conn, err := s.getConnection(ctx) return pb.NewRefServiceClient(conn), err } -func newConnection(ctx context.Context) (*grpc.ClientConn, error) { +func (s *Server) getConnection(ctx context.Context) (*grpc.ClientConn, error) { + s.clientConnMu.RLock() + conn := s.clientConn + s.clientConnMu.RUnlock() + + if conn != nil { + return conn, nil + } + + return s.createConnection(ctx) +} + +func (s *Server) createConnection(ctx context.Context) (*grpc.ClientConn, error) { + s.clientConnMu.Lock() + defer s.clientConnMu.Unlock() + + if conn := s.clientConn; conn != nil { + return conn, nil + } + dialCtx, cancel := context.WithTimeout(ctx, ConnectTimeout) defer cancel() - return grpc.DialContext(dialCtx, socketPath(), dialOptions()...) + + conn, err := grpc.DialContext(dialCtx, socketPath(), dialOptions()...) + if err != nil { + return nil, err + } + + s.clientConn = conn + return s.clientConn, nil } func dialOptions() []grpc.DialOption { diff --git a/internal/server/auth_test.go b/internal/server/auth_test.go index 2d14d47c9..f5c8ae197 100644 --- a/internal/server/auth_test.go +++ b/internal/server/auth_test.go @@ -136,7 +136,7 @@ func healthCheck(conn *grpc.ClientConn) error { } func runServer(t *testing.T) *grpc.Server { - srv := New() + srv := New(nil) listener, err := net.Listen("unix", serverSocketPath) require.NoError(t, err) diff --git a/internal/server/server.go b/internal/server/server.go index 0abe6178e..72c9f1904 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -8,6 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/objectdirhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/service" "github.com/grpc-ecosystem/go-grpc-middleware" @@ -20,7 +21,7 @@ import ( ) // New returns a GRPC server with all Gitaly services and interceptors set up. -func New() *grpc.Server { +func New(rubyServer *rubyserver.Server) *grpc.Server { logrusEntry := log.NewEntry(log.StandardLogger()) grpc_logrus.ReplaceGrpcLogger(logrusEntry) @@ -55,7 +56,7 @@ func New() *grpc.Server { )), ) - service.RegisterAll(server) + service.RegisterAll(server, rubyServer) reflection.Register(server) grpc_prometheus.Register(server) diff --git a/internal/service/commit/find_commits.go b/internal/service/commit/find_commits.go index b4c43bf5b..18053b9f7 100644 --- a/internal/service/commit/find_commits.go +++ b/internal/service/commit/find_commits.go @@ -10,7 +10,7 @@ import ( "google.golang.org/grpc/codes" ) -func (*server) FindCommits(req *pb.FindCommitsRequest, stream pb.CommitService_FindCommitsServer) error { +func (s *server) FindCommits(req *pb.FindCommitsRequest, stream pb.CommitService_FindCommitsServer) error { ctx := stream.Context() // Use Gitaly's default branch lookup function because that is already @@ -27,7 +27,7 @@ func (*server) FindCommits(req *pb.FindCommitsRequest, stream pb.CommitService_F } } - client, err := rubyserver.CommitServiceClient(ctx) + client, err := s.CommitServiceClient(ctx) if err != nil { return err } diff --git a/internal/service/commit/server.go b/internal/service/commit/server.go index 779fcad31..7607d6f48 100644 --- a/internal/service/commit/server.go +++ b/internal/service/commit/server.go @@ -1,12 +1,15 @@ package commit import ( + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/service/ref" pb "gitlab.com/gitlab-org/gitaly-proto/go" ) -type server struct{} +type server struct { + *rubyserver.Server +} var ( defaultBranchName = ref.DefaultBranchName @@ -14,6 +17,6 @@ var ( ) // NewServer creates a new instance of a grpc CommitServiceServer -func NewServer() pb.CommitServiceServer { - return &server{} +func NewServer(rs *rubyserver.Server) pb.CommitServiceServer { + return &server{rs} } diff --git a/internal/service/commit/stats.go b/internal/service/commit/stats.go index f3dec776e..6c5efd335 100644 --- a/internal/service/commit/stats.go +++ b/internal/service/commit/stats.go @@ -7,8 +7,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/rubyserver" ) -func (server) CommitStats(ctx context.Context, in *pb.CommitStatsRequest) (*pb.CommitStatsResponse, error) { - client, err := rubyserver.CommitServiceClient(ctx) +func (s *server) CommitStats(ctx context.Context, in *pb.CommitStatsRequest) (*pb.CommitStatsResponse, error) { + client, err := s.CommitServiceClient(ctx) if err != nil { return nil, err } diff --git a/internal/service/commit/testhelper_test.go b/internal/service/commit/testhelper_test.go index 852d22534..7b3eaee14 100644 --- a/internal/service/commit/testhelper_test.go +++ b/internal/service/commit/testhelper_test.go @@ -29,6 +29,8 @@ func TestMain(m *testing.M) { os.Exit(testMain(m)) } +var rubyServer *rubyserver.Server + func testMain(m *testing.M) int { defer testhelper.MustHaveNoChildProcess() @@ -37,11 +39,12 @@ func testMain(m *testing.M) int { log.Fatal(err) } - ruby, err := rubyserver.Start() + var err error + rubyServer, err = rubyserver.Start() if err != nil { log.Fatal(err) } - defer ruby.Stop() + defer rubyServer.Stop() return m.Run() } @@ -61,7 +64,7 @@ func startTestServices(t *testing.T) *grpc.Server { t.Fatal("failed to start server") } - pb.RegisterCommitServiceServer(server, NewServer()) + pb.RegisterCommitServiceServer(server, NewServer(rubyServer)) reflection.Register(server) go server.Serve(listener) diff --git a/internal/service/diff/patch.go b/internal/service/diff/patch.go index e6f3f2bea..d47e06828 100644 --- a/internal/service/diff/patch.go +++ b/internal/service/diff/patch.go @@ -6,10 +6,10 @@ import ( pb "gitlab.com/gitlab-org/gitaly-proto/go" ) -func (server) CommitPatch(in *pb.CommitPatchRequest, stream pb.DiffService_CommitPatchServer) error { +func (s *server) CommitPatch(in *pb.CommitPatchRequest, stream pb.DiffService_CommitPatchServer) error { ctx := stream.Context() - client, err := rubyserver.DiffServiceClient(ctx) + client, err := s.DiffServiceClient(ctx) if err != nil { return err } diff --git a/internal/service/diff/server.go b/internal/service/diff/server.go index 8cac8b7b0..644298e44 100644 --- a/internal/service/diff/server.go +++ b/internal/service/diff/server.go @@ -2,15 +2,20 @@ package diff import ( pb "gitlab.com/gitlab-org/gitaly-proto/go" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" ) const msgSizeThreshold = 5 * 1024 type server struct { MsgSizeThreshold int + *rubyserver.Server } // NewServer creates a new instance of a gRPC DiffServer -func NewServer() pb.DiffServiceServer { - return &server{MsgSizeThreshold: msgSizeThreshold} +func NewServer(rs *rubyserver.Server) pb.DiffServiceServer { + return &server{ + MsgSizeThreshold: msgSizeThreshold, + Server: rs, + } } diff --git a/internal/service/diff/testhelper_test.go b/internal/service/diff/testhelper_test.go index 1c2bbb901..0b5ce1076 100644 --- a/internal/service/diff/testhelper_test.go +++ b/internal/service/diff/testhelper_test.go @@ -25,17 +25,20 @@ func TestMain(m *testing.M) { os.Exit(testMain(m)) } +var rubyServer *rubyserver.Server + func testMain(m *testing.M) int { defer testhelper.MustHaveNoChildProcess() testRepo = testhelper.TestRepository() testhelper.ConfigureRuby() - ruby, err := rubyserver.Start() + var err error + rubyServer, err = rubyserver.Start() if err != nil { log.Fatal(err) } - defer ruby.Stop() + defer rubyServer.Stop() return m.Run() } @@ -47,7 +50,7 @@ func runDiffServer(t *testing.T) *grpc.Server { t.Fatal(err) } - pb.RegisterDiffServiceServer(server, NewServer()) + pb.RegisterDiffServiceServer(server, NewServer(rubyServer)) reflection.Register(server) go server.Serve(listener) diff --git a/internal/service/ref/branches.go b/internal/service/ref/branches.go index 5b4e12e4f..e79f73b77 100644 --- a/internal/service/ref/branches.go +++ b/internal/service/ref/branches.go @@ -9,7 +9,7 @@ import ( ) func (s *server) CreateBranch(ctx context.Context, req *pb.CreateBranchRequest) (*pb.CreateBranchResponse, error) { - client, err := rubyserver.RefServiceClient(ctx) + client, err := s.RefServiceClient(ctx) if err != nil { return nil, err } @@ -23,7 +23,7 @@ func (s *server) CreateBranch(ctx context.Context, req *pb.CreateBranchRequest) } func (s *server) DeleteBranch(ctx context.Context, req *pb.DeleteBranchRequest) (*pb.DeleteBranchResponse, error) { - client, err := rubyserver.RefServiceClient(ctx) + client, err := s.RefServiceClient(ctx) if err != nil { return nil, err } @@ -37,7 +37,7 @@ func (s *server) DeleteBranch(ctx context.Context, req *pb.DeleteBranchRequest) } func (s *server) FindBranch(ctx context.Context, req *pb.FindBranchRequest) (*pb.FindBranchResponse, error) { - client, err := rubyserver.RefServiceClient(ctx) + client, err := s.RefServiceClient(ctx) if err != nil { return nil, err } diff --git a/internal/service/ref/server.go b/internal/service/ref/server.go index 4e94798e0..9641c1176 100644 --- a/internal/service/ref/server.go +++ b/internal/service/ref/server.go @@ -2,11 +2,14 @@ package ref import ( pb "gitlab.com/gitlab-org/gitaly-proto/go" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" ) -type server struct{} +type server struct { + *rubyserver.Server +} // NewServer creates a new instance of a grpc RefServer -func NewServer() pb.RefServiceServer { - return &server{} +func NewServer(rs *rubyserver.Server) pb.RefServiceServer { + return &server{rs} } diff --git a/internal/service/ref/testhelper_test.go b/internal/service/ref/testhelper_test.go index f432a64a8..60bdfdaa3 100644 --- a/internal/service/ref/testhelper_test.go +++ b/internal/service/ref/testhelper_test.go @@ -74,6 +74,8 @@ func TestMain(m *testing.M) { os.Exit(testMain(m)) } +var rubyServer *rubyserver.Server + func testMain(m *testing.M) int { defer testhelper.MustHaveNoChildProcess() @@ -86,11 +88,11 @@ func testMain(m *testing.M) int { } testhelper.ConfigureRuby() - ruby, err := rubyserver.Start() + rubyServer, err = rubyserver.Start() if err != nil { log.Fatal(err) } - defer ruby.Stop() + defer rubyServer.Stop() // Use 100 bytes as the maximum message size to test that fragmenting the // ref list works correctly @@ -108,7 +110,7 @@ func runRefServiceServer(t *testing.T) *grpc.Server { t.Fatal(err) } - pb.RegisterRefServiceServer(grpcServer, &server{}) + pb.RegisterRefServiceServer(grpcServer, &server{rubyServer}) reflection.Register(grpcServer) go grpcServer.Serve(listener) diff --git a/internal/service/register.go b/internal/service/register.go index a7b8d6136..140bf5113 100644 --- a/internal/service/register.go +++ b/internal/service/register.go @@ -2,6 +2,7 @@ package service import ( pb "gitlab.com/gitlab-org/gitaly-proto/go" + "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/service/blob" "gitlab.com/gitlab-org/gitaly/internal/service/commit" "gitlab.com/gitlab-org/gitaly/internal/service/diff" @@ -19,20 +20,20 @@ import ( // RegisterAll will register all the known grpc services with // the specified grpc service instance -func RegisterAll(grpcServer *grpc.Server) { +func RegisterAll(grpcServer *grpc.Server, rubyServer *rubyserver.Server) { notificationsService := notifications.NewServer() pb.RegisterNotificationServiceServer(grpcServer, notificationsService) - refService := ref.NewServer() + refService := ref.NewServer(rubyServer) pb.RegisterRefServiceServer(grpcServer, refService) smartHTTPService := smarthttp.NewServer() pb.RegisterSmartHTTPServiceServer(grpcServer, smartHTTPService) - diffService := diff.NewServer() + diffService := diff.NewServer(rubyServer) pb.RegisterDiffServiceServer(grpcServer, diffService) - commitService := commit.NewServer() + commitService := commit.NewServer(rubyServer) pb.RegisterCommitServiceServer(grpcServer, commitService) sshService := ssh.NewServer() |