Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/streamrpc/server.go')
-rw-r--r--internal/streamrpc/server.go166
1 files changed, 0 insertions, 166 deletions
diff --git a/internal/streamrpc/server.go b/internal/streamrpc/server.go
deleted file mode 100644
index 365898cae..000000000
--- a/internal/streamrpc/server.go
+++ /dev/null
@@ -1,166 +0,0 @@
-package streamrpc
-
-import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net"
- "time"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
- "google.golang.org/protobuf/proto"
-)
-
-var _ grpc.ServiceRegistrar = &Server{}
-
-// Server handles network connections and routes them to StreamRPC handlers.
-type Server struct {
- methods map[string]*method
- interceptor grpc.UnaryServerInterceptor
-}
-
-type method struct {
- *grpc.MethodDesc
- implementation interface{}
-}
-
-// ServerOption is an abstraction that lets you pass 0 or more server
-// options to NewServer.
-type ServerOption func(*Server)
-
-// WithServerInterceptor adds a unary gRPC server interceptor.
-func WithServerInterceptor(interceptor grpc.UnaryServerInterceptor) ServerOption {
- return func(s *Server) { s.interceptor = interceptor }
-}
-
-// NewServer returns a new StreamRPC server. You can pass the result to
-// grpc-go RegisterFooServer functions.
-func NewServer(opts ...ServerOption) *Server {
- s := &Server{
- methods: make(map[string]*method),
- }
- for _, o := range opts {
- o(s)
- }
- return s
-}
-
-// RegisterService implements grpc.ServiceRegistrar. It makes it possible
-// to pass a *Server to grpc-go foopb.RegisterFooServer functions as the
-// first argument.
-func (s *Server) RegisterService(sd *grpc.ServiceDesc, impl interface{}) {
- for i := range sd.Methods {
- m := &sd.Methods[i]
- s.methods["/"+sd.ServiceName+"/"+m.MethodName] = &method{
- MethodDesc: m,
- implementation: impl,
- }
- }
-}
-
-// Handle handles an incoming network connection with the StreamRPC
-// protocol. It is intended to be called from a net.Listener.Accept loop
-// (or something equivalent).
-func (s *Server) Handle(c net.Conn) error {
- defer c.Close()
-
- deadline := time.Now().Add(defaultHandshakeTimeout)
- req, err := recvFrame(c, deadline)
- if err != nil {
- return err
- }
-
- session := &serverSession{
- c: c,
- deadline: deadline,
- }
- if err := s.handleSession(session, req); err != nil {
- return session.reject(err)
- }
-
- return nil
-}
-
-func (s *Server) handleSession(session *serverSession, reqBytes []byte) error {
- req := &request{}
- if err := json.Unmarshal(reqBytes, req); err != nil {
- return err
- }
-
- method, ok := s.methods[req.Method]
- if !ok {
- return fmt.Errorf("method not found: %s", req.Method)
- }
-
- ctx, cancel := serverContext(session, req)
- defer cancel()
-
- if _, err := method.Handler(
- method.implementation,
- ctx,
- func(msg interface{}) error { return proto.Unmarshal(req.Message, msg.(proto.Message)) },
- s.interceptor,
- ); err != nil {
- return err
- }
-
- return nil
-}
-
-func serverContext(session *serverSession, req *request) (context.Context, func()) {
- ctx := context.Background()
- ctx = context.WithValue(ctx, sessionKey{}, session)
- ctx = metadata.NewIncomingContext(ctx, req.Metadata)
- return context.WithCancel(ctx)
-}
-
-type sessionKey struct{}
-
-// AcceptConnection completes the StreamRPC handshake on the server side.
-// It notifies the client that the server has accepted the stream, and
-// returns the connection.
-func AcceptConnection(ctx context.Context) (net.Conn, error) {
- session, ok := ctx.Value(sessionKey{}).(*serverSession)
- if !ok {
- return nil, errors.New("context has no serverSession")
- }
- return session.Accept()
-}
-
-// serverSession wraps an incoming connection whose handshake has not
-// been completed yet.
-type serverSession struct {
- c net.Conn
- accepted bool
- deadline time.Time
-}
-
-// Accept completes the handshake on the connection wrapped by ss and
-// unwraps the connection.
-func (ss *serverSession) Accept() (net.Conn, error) {
- if ss.accepted {
- return nil, errors.New("connection already accepted")
- }
-
- ss.accepted = true
- if err := sendFrame(ss.c, nil, ss.deadline); err != nil {
- return nil, fmt.Errorf("accept session: %w", err)
- }
-
- return ss.c, nil
-}
-
-func (ss *serverSession) reject(err error) error {
- if ss.accepted {
- return nil
- }
-
- buf, err := json.Marshal(&response{Error: err.Error()})
- if err != nil {
- return fmt.Errorf("mashal response: %w", err)
- }
-
- return sendFrame(ss.c, buf, ss.deadline)
-}