diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-10-17 22:35:45 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-10-17 22:35:45 +0300 |
commit | 95edabecd90fe7b97cd2d3e9fbf9b69a21430937 (patch) | |
tree | c640cf0db92395ecfef02d6dffab1fd34393f61b | |
parent | b558b78a7b8e6da798a7ddd476bd8fb1085b856f (diff) |
Refactor praefect server tests
-rw-r--r-- | changelogs/unreleased/po-praefect-refactor-e2e.yml | 5 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 13 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 262 | ||||
-rw-r--r-- | internal/praefect/mock/mock.pb.go | 173 | ||||
-rw-r--r-- | internal/praefect/mock/mock.proto | 29 | ||||
-rw-r--r-- | internal/praefect/mocksvc_test.go | 27 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 289 |
7 files changed, 505 insertions, 293 deletions
diff --git a/changelogs/unreleased/po-praefect-refactor-e2e.yml b/changelogs/unreleased/po-praefect-refactor-e2e.yml new file mode 100644 index 000000000..781633708 --- /dev/null +++ b/changelogs/unreleased/po-praefect-refactor-e2e.yml @@ -0,0 +1,5 @@ +--- +title: Refactor praefect server tests +merge_request: 1554 +author: +type: other diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index f7b27394b..25fe7e4a0 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -64,7 +64,7 @@ func TestAuthFailures(t *testing.T) { cli := mock.NewSimpleServiceClient(conn) - _, err = cli.SimpleUnaryUnary(ctx, &mock.SimpleRequest{ + _, err = cli.ServerAccessor(ctx, &mock.SimpleRequest{ Value: 1, }) @@ -133,7 +133,7 @@ func TestAuthSuccess(t *testing.T) { cli := mock.NewSimpleServiceClient(conn) - _, err = cli.SimpleUnaryUnary(ctx, &mock.SimpleRequest{ + _, err = cli.ServerAccessor(ctx, &mock.SimpleRequest{ Value: 1, }) @@ -155,7 +155,14 @@ func dial(serverSocketPath string, opts []grpc.DialOption) (*grpc.ClientConn, er func runServer(t *testing.T, token string, required bool) (*Server, string, func()) { backendToken := "abcxyz" - backend, cleanup := newMockDownstream(t, backendToken, callbackIncrement) + mockServer := &mockSvc{ + serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { + return &mock.SimpleResponse{ + Value: req.Value + 1, + }, nil + }, + } + backend, cleanup := newMockDownstream(t, backendToken, mockServer) conf := config.Config{ VirtualStorageName: "praefect", diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go new file mode 100644 index 000000000..0e4dc2ffe --- /dev/null +++ b/internal/praefect/helper_test.go @@ -0,0 +1,262 @@ +package praefect + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/golang/protobuf/protoc-gen-go/descriptor" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/client" + internalauth "gitlab.com/gitlab-org/gitaly/internal/auth" + "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/server/auth" + gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" +) + +func waitUntil(t *testing.T, ch <-chan struct{}, timeout time.Duration) { + select { + case <-ch: + break + case <-time.After(timeout): + t.Errorf("timed out waiting for channel after %s", timeout) + } +} + +// generates a praefect configuration with the specified +// number of backend nodes +func testConfig(backends int) config.Config { + cfg := config.Config{ + VirtualStorageName: "praefect", + } + + var nodes []*models.Node + + for i := 0; i < backends; i++ { + n := &models.Node{ + ID: i, + Storage: fmt.Sprintf("praefect-internal-%d", i), + Token: fmt.Sprintf("%d", i), + } + + if i == 0 { + n.DefaultPrimary = true + } + + nodes = append(nodes, n) + } + + cfg.Nodes = nodes + + return cfg +} + +// runPraefectServer runs a praefect server with the provided mock servers. +// Each mock server is keyed by the corresponding index of the node in the +// config.Nodes. There must be a 1-to-1 mapping between backend server and +// configured storage node. +func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[int]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, func()) { + var ( + datastore = NewMemoryDatastore(conf) + logEntry = log.Default() + clientCC = conn.NewClientConnections() + coordinator = NewCoordinator(logEntry, datastore, clientCC, conf, mustLoadProtoReg(t)) + ) + + require.Equal(t, len(backends), len(conf.Nodes), + "mock server count doesn't match config nodes") + + var cleanups []func() + + for id, nodeStorage := range datastore.storageNodes.m { + backend, ok := backends[id] + require.True(t, ok, "missing backend server for node %d", id) + + backendAddr, cleanup := newMockDownstream(t, nodeStorage.Token, backend) + cleanups = append(cleanups, cleanup) + + clientCC.RegisterNode(nodeStorage.Storage, backendAddr, nodeStorage.Token) + nodeStorage.Address = backendAddr + datastore.storageNodes.m[id] = nodeStorage + } + + replmgr := NewReplMgr( + "default", + logEntry, + datastore, + clientCC, + ) + prf := NewServer( + coordinator, + replmgr, + nil, + logEntry, + clientCC, + conf, + ) + + listener, port := listenAvailPort(t) + t.Logf("praefect listening on port %d", port) + + errQ := make(chan error) + + go func() { + errQ <- prf.Start(listener) + }() + + // dial client to praefect + cc := dialLocalPort(t, port, false) + + cleanup := func() { + for _, cu := range cleanups { + cu() + } + require.NoError(t, cc.Close()) + require.NoError(t, listener.Close()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + require.NoError(t, prf.Shutdown(ctx)) + } + + return mock.NewSimpleServiceClient(cc), prf, cleanup +} + +// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes +func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server) { + datastore := NewMemoryDatastore(conf) + logEntry := log.Default() + clientCC := conn.NewClientConnections() + + for id, nodeStorage := range datastore.storageNodes.m { + _, backend := runInternalGitalyServer(t, nodeStorage.Token) + + clientCC.RegisterNode(nodeStorage.Storage, backend, nodeStorage.Token) + nodeStorage.Address = backend + datastore.storageNodes.m[id] = nodeStorage + } + + coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) + + replmgr := NewReplMgr( + "", + logEntry, + datastore, + clientCC, + ) + + prf := NewServer( + coordinator, + replmgr, + nil, + logEntry, + clientCC, + conf, + ) + + listener, port := listenAvailPort(t) + t.Logf("proxy listening on port %d", port) + + errQ := make(chan error) + + go func() { + errQ <- prf.Start(listener) + }() + + // dial client to praefect + cc := dialLocalPort(t, port, false) + + return cc, prf +} + +func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string) { + streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor(internalauth.Config{Token: token})} + unaryInt := []grpc.UnaryServerInterceptor{auth.UnaryServerInterceptor(internalauth.Config{Token: token})} + + server := testhelper.NewTestGrpcServer(t, streamInt, unaryInt) + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", serverSocketPath) + if err != nil { + t.Fatal(err) + } + + gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) + + go server.Serve(listener) + + return server, "unix://" + serverSocketPath +} + +func mustLoadProtoReg(t *testing.T) *descriptor.FileDescriptorProto { + gz, _ := (*mock.SimpleRequest)(nil).Descriptor() + fd, err := protoregistry.ExtractFileDescriptor(gz) + require.NoError(t, err) + return fd +} + +func listenAvailPort(tb testing.TB) (net.Listener, int) { + listener, err := net.Listen("tcp", ":0") + require.NoError(tb, err) + + return listener, listener.Addr().(*net.TCPAddr).Port +} + +func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { + opts := []grpc.DialOption{ + grpc.WithBlock(), + } + if backend { + opts = append( + opts, + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + ) + } + + cc, err := client.Dial( + fmt.Sprintf("tcp://localhost:%d", port), + opts, + ) + require.NoError(tb, err) + + return cc +} + +func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) { + srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(internalauth.Config{Token: token}))) + mock.RegisterSimpleServiceServer(srv, m) + + // client to backend service + lis, port := listenAvailPort(tb) + + errQ := make(chan error) + + go func() { + errQ <- srv.Serve(lis) + }() + + cleanup := func() { + srv.GracefulStop() + lis.Close() + + // If the server is shutdown before Serve() is called on it + // the Serve() calls will return the ErrServerStopped + if err := <-errQ; err != nil && err != grpc.ErrServerStopped { + require.NoError(tb, err) + } + } + + return fmt.Sprintf("tcp://localhost:%d", port), cleanup +} diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go index 03a58459a..edc09f622 100644 --- a/internal/praefect/mock/mock.pb.go +++ b/internal/praefect/mock/mock.pb.go @@ -9,10 +9,9 @@ import ( math "math" proto "github.com/golang/protobuf/proto" - _ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + empty "github.com/golang/protobuf/ptypes/empty" + gitalypb "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" ) // Reference imports to suppress errors if they are not otherwise used. @@ -104,25 +103,73 @@ func (m *SimpleResponse) GetValue() int32 { return 0 } +type RepoRequest struct { + Repo *gitalypb.Repository `protobuf:"bytes,1,opt,name=repo,proto3" json:"repo,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RepoRequest) Reset() { *m = RepoRequest{} } +func (m *RepoRequest) String() string { return proto.CompactTextString(m) } +func (*RepoRequest) ProtoMessage() {} +func (*RepoRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_6fa4806c90f7156d, []int{2} +} + +func (m *RepoRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RepoRequest.Unmarshal(m, b) +} +func (m *RepoRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RepoRequest.Marshal(b, m, deterministic) +} +func (m *RepoRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RepoRequest.Merge(m, src) +} +func (m *RepoRequest) XXX_Size() int { + return xxx_messageInfo_RepoRequest.Size(m) +} +func (m *RepoRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RepoRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_RepoRequest proto.InternalMessageInfo + +func (m *RepoRequest) GetRepo() *gitalypb.Repository { + if m != nil { + return m.Repo + } + return nil +} + func init() { proto.RegisterType((*SimpleRequest)(nil), "mock.SimpleRequest") proto.RegisterType((*SimpleResponse)(nil), "mock.SimpleResponse") + proto.RegisterType((*RepoRequest)(nil), "mock.RepoRequest") } func init() { proto.RegisterFile("mock.proto", fileDescriptor_6fa4806c90f7156d) } var fileDescriptor_6fa4806c90f7156d = []byte{ - // 154 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xca, 0xcd, 0x4f, 0xce, - 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0xa5, 0x78, 0x8a, 0x33, 0x12, 0x8b, - 0x52, 0x53, 0x20, 0x62, 0x4a, 0xaa, 0x5c, 0xbc, 0xc1, 0x99, 0xb9, 0x05, 0x39, 0xa9, 0x41, 0xa9, - 0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x22, 0x5c, 0xac, 0x65, 0x89, 0x39, 0xa5, 0xa9, 0x12, 0x8c, - 0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x10, 0x8e, 0x92, 0x1a, 0x17, 0x1f, 0x4c, 0x59, 0x71, 0x41, 0x7e, - 0x5e, 0x71, 0x2a, 0x42, 0x1d, 0x13, 0x92, 0x3a, 0xa3, 0x08, 0x98, 0x71, 0xc1, 0xa9, 0x45, 0x65, - 0x99, 0xc9, 0xa9, 0x42, 0xee, 0x5c, 0x02, 0x10, 0x81, 0xd0, 0xbc, 0xc4, 0xa2, 0x4a, 0x30, 0x21, - 0x24, 0xac, 0x07, 0x76, 0x14, 0x8a, 0xbd, 0x52, 0x22, 0xa8, 0x82, 0x10, 0x5b, 0x94, 0x38, 0x7e, - 0x4d, 0xd7, 0x60, 0xe1, 0x60, 0x12, 0x60, 0x4c, 0x62, 0x03, 0xbb, 0xd7, 0x18, 0x10, 0x00, 0x00, - 0xff, 0xff, 0xe4, 0x1b, 0xb4, 0x1f, 0xd1, 0x00, 0x00, 0x00, + // 275 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x50, 0xcd, 0x4a, 0xc3, 0x40, + 0x10, 0x66, 0x4b, 0x22, 0x71, 0xaa, 0xa5, 0x5d, 0x8b, 0xc8, 0x7a, 0x91, 0x80, 0x25, 0xa7, 0x2d, + 0xad, 0xf8, 0x00, 0x1e, 0x0a, 0x7a, 0xf0, 0x92, 0xe2, 0x03, 0xa4, 0x71, 0x8c, 0xc1, 0xa4, 0xb3, + 0xee, 0x6e, 0x0a, 0x79, 0x92, 0x3e, 0xa4, 0x6f, 0xd0, 0x93, 0x6c, 0x96, 0x60, 0x7b, 0xf5, 0x36, + 0x33, 0xdf, 0xcf, 0x7e, 0xfb, 0x01, 0xd4, 0x94, 0x7f, 0x49, 0xa5, 0xc9, 0x12, 0x0f, 0xdc, 0x2c, + 0x2e, 0xcc, 0x67, 0xa6, 0xf1, 0xdd, 0xdf, 0xc4, 0x6d, 0x41, 0x54, 0x54, 0x38, 0xef, 0xb6, 0x4d, + 0xf3, 0x31, 0xc7, 0x5a, 0xd9, 0xd6, 0x83, 0xf1, 0x3d, 0x5c, 0xae, 0xcb, 0x5a, 0x55, 0x98, 0xe2, + 0x77, 0x83, 0xc6, 0xf2, 0x29, 0x84, 0xbb, 0xac, 0x6a, 0xf0, 0x86, 0xdd, 0xb1, 0x24, 0x4c, 0xfd, + 0x12, 0xcf, 0x60, 0xd4, 0xd3, 0x8c, 0xa2, 0xad, 0xc1, 0x3f, 0xde, 0xe0, 0x98, 0xf7, 0x08, 0xc3, + 0x14, 0x15, 0xf5, 0x66, 0x33, 0x08, 0x34, 0x2a, 0xea, 0xbc, 0x86, 0x4b, 0x2e, 0x8b, 0xd2, 0x66, + 0x55, 0x2b, 0x1d, 0xc5, 0x94, 0x96, 0x74, 0x9b, 0x76, 0xf8, 0xf2, 0x87, 0xf5, 0x31, 0xd6, 0xa8, + 0x77, 0x65, 0x8e, 0x7c, 0x05, 0x23, 0x37, 0xa2, 0x7e, 0xca, 0x73, 0x34, 0x86, 0x34, 0xbf, 0x92, + 0xdd, 0x3f, 0x4f, 0xd2, 0x8a, 0xe9, 0xe9, 0xd1, 0x67, 0x8b, 0xa3, 0xc3, 0x3e, 0x09, 0xa2, 0xc1, + 0x98, 0xf1, 0x17, 0x98, 0xb8, 0xc7, 0x7a, 0x93, 0xb7, 0x6d, 0xa6, 0x5b, 0x3e, 0xf1, 0xa2, 0xa3, + 0xa0, 0xe2, 0x5a, 0xfa, 0x92, 0x64, 0x5f, 0x92, 0x5c, 0xb9, 0x92, 0xe2, 0xf3, 0xc3, 0x3e, 0x09, + 0xa3, 0x81, 0x60, 0x0b, 0xfe, 0x0c, 0x63, 0xa7, 0x78, 0x6d, 0x6c, 0x66, 0xff, 0xed, 0xc4, 0x04, + 0x5b, 0x6c, 0xce, 0x3a, 0xe8, 0xe1, 0x37, 0x00, 0x00, 0xff, 0xff, 0x3c, 0x0c, 0xc7, 0x57, 0xb9, + 0x01, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -137,8 +184,12 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type SimpleServiceClient interface { - // SimpleUnaryUnary is a simple unary request with unary response - SimpleUnaryUnary(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error) + // ServerAccessor is a unary RPC that accesses a server + ServerAccessor(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error) + // RepoAccessorUnary is a unary RPC that accesses a repo + RepoAccessorUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*empty.Empty, error) + // RepoMutatorUnary is a unary RPC that mutates a repo + RepoMutatorUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*empty.Empty, error) } type simpleServiceClient struct { @@ -149,47 +200,97 @@ func NewSimpleServiceClient(cc *grpc.ClientConn) SimpleServiceClient { return &simpleServiceClient{cc} } -func (c *simpleServiceClient) SimpleUnaryUnary(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error) { +func (c *simpleServiceClient) ServerAccessor(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error) { out := new(SimpleResponse) - err := c.cc.Invoke(ctx, "/mock.SimpleService/SimpleUnaryUnary", in, out, opts...) + err := c.cc.Invoke(ctx, "/mock.SimpleService/ServerAccessor", in, out, opts...) if err != nil { return nil, err } return out, nil } -// SimpleServiceServer is the server API for SimpleService service. -type SimpleServiceServer interface { - // SimpleUnaryUnary is a simple unary request with unary response - SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error) +func (c *simpleServiceClient) RepoAccessorUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/mock.SimpleService/RepoAccessorUnary", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil } -// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations. -type UnimplementedSimpleServiceServer struct { +func (c *simpleServiceClient) RepoMutatorUnary(ctx context.Context, in *RepoRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/mock.SimpleService/RepoMutatorUnary", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil } -func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented") +// SimpleServiceServer is the server API for SimpleService service. +type SimpleServiceServer interface { + // ServerAccessor is a unary RPC that accesses a server + ServerAccessor(context.Context, *SimpleRequest) (*SimpleResponse, error) + // RepoAccessorUnary is a unary RPC that accesses a repo + RepoAccessorUnary(context.Context, *RepoRequest) (*empty.Empty, error) + // RepoMutatorUnary is a unary RPC that mutates a repo + RepoMutatorUnary(context.Context, *RepoRequest) (*empty.Empty, error) } func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) { s.RegisterService(&_SimpleService_serviceDesc, srv) } -func _SimpleService_SimpleUnaryUnary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _SimpleService_ServerAccessor_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(SimpleRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(SimpleServiceServer).SimpleUnaryUnary(ctx, in) + return srv.(SimpleServiceServer).ServerAccessor(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/mock.SimpleService/SimpleUnaryUnary", + FullMethod: "/mock.SimpleService/ServerAccessor", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SimpleServiceServer).SimpleUnaryUnary(ctx, req.(*SimpleRequest)) + return srv.(SimpleServiceServer).ServerAccessor(ctx, req.(*SimpleRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _SimpleService_RepoAccessorUnary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RepoRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SimpleServiceServer).RepoAccessorUnary(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/mock.SimpleService/RepoAccessorUnary", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SimpleServiceServer).RepoAccessorUnary(ctx, req.(*RepoRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _SimpleService_RepoMutatorUnary_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RepoRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SimpleServiceServer).RepoMutatorUnary(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/mock.SimpleService/RepoMutatorUnary", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SimpleServiceServer).RepoMutatorUnary(ctx, req.(*RepoRequest)) } return interceptor(ctx, in, info, handler) } @@ -199,8 +300,16 @@ var _SimpleService_serviceDesc = grpc.ServiceDesc{ HandlerType: (*SimpleServiceServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "SimpleUnaryUnary", - Handler: _SimpleService_SimpleUnaryUnary_Handler, + MethodName: "ServerAccessor", + Handler: _SimpleService_ServerAccessor_Handler, + }, + { + MethodName: "RepoAccessorUnary", + Handler: _SimpleService_RepoAccessorUnary_Handler, + }, + { + MethodName: "RepoMutatorUnary", + Handler: _SimpleService_RepoMutatorUnary_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto index 59e79d3b9..a1718fdd7 100644 --- a/internal/praefect/mock/mock.proto +++ b/internal/praefect/mock/mock.proto @@ -8,21 +8,44 @@ syntax = "proto3"; package mock; import "shared.proto"; +import "google/protobuf/empty.proto"; message SimpleRequest { int32 value = 1; } + message SimpleResponse { int32 value = 2; } +message RepoRequest { + gitaly.Repository repo = 1; +} + service SimpleService { - // SimpleUnaryUnary is a simple unary request with unary response - rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) { + // ServerAccessor is a unary RPC that accesses a server + rpc ServerAccessor(SimpleRequest) returns (SimpleResponse) { option (gitaly.op_type) = { op: ACCESSOR scope_level: SERVER }; } + + // RepoAccessorUnary is a unary RPC that accesses a repo + rpc RepoAccessorUnary(RepoRequest) returns (google.protobuf.Empty) { + option (gitaly.op_type) = { + op: ACCESSOR + scope_level: REPOSITORY + target_repository_field: "1" + }; + } + + // RepoMutatorUnary is a unary RPC that mutates a repo + rpc RepoMutatorUnary(RepoRequest) returns (google.protobuf.Empty) { + option (gitaly.op_type) = { + op: MUTATOR + scope_level: REPOSITORY + target_repository_field: "1" + }; + } } - diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go index 60e12595c..e7b1c1257 100644 --- a/internal/praefect/mocksvc_test.go +++ b/internal/praefect/mocksvc_test.go @@ -3,20 +3,37 @@ package praefect import ( "context" + "github.com/golang/protobuf/ptypes/empty" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" ) -type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock.SimpleResponse, error) +type ( + serverAccessorFunc func(context.Context, *mock.SimpleRequest) (*mock.SimpleResponse, error) + repoAccessorUnaryFunc func(context.Context, *mock.RepoRequest) (*empty.Empty, error) + repoMutatorUnaryFunc func(context.Context, *mock.RepoRequest) (*empty.Empty, error) +) // mockSvc is an implementation of mock.SimpleServer for testing purposes. The // gRPC stub can be updated via go generate: // //go:generate make mock/mock.pb.go type mockSvc struct { - simpleUnaryUnary simpleUnaryUnaryCallback + serverAccessor serverAccessorFunc + repoAccessorUnary repoAccessorUnaryFunc + repoMutatorUnary repoMutatorUnaryFunc +} + +// ServerAccessor is implemented by a callback +func (m *mockSvc) ServerAccessor(ctx context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { + return m.serverAccessor(ctx, req) +} + +// RepoAccessorUnary is implemented by a callback +func (m *mockSvc) RepoAccessorUnary(ctx context.Context, req *mock.RepoRequest) (*empty.Empty, error) { + return m.repoAccessorUnary(ctx, req) } -// SimpleUnaryUnary is implemented by a callback -func (m *mockSvc) SimpleUnaryUnary(ctx context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { - return m.simpleUnaryUnary(ctx, req) +// RepoMutatorUnary is implemented by a callback +func (m *mockSvc) RepoMutatorUnary(ctx context.Context, req *mock.RepoRequest) (*empty.Empty, error) { + return m.repoMutatorUnary(ctx, req) } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index df0529166..e7a6fdcb2 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -3,145 +3,67 @@ package praefect import ( "context" "fmt" - "net" "testing" "time" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/client" - internalauth "gitlab.com/gitlab-org/gitaly/internal/auth" "gitlab.com/gitlab-org/gitaly/internal/git" - "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" - "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" - "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" - "gitlab.com/gitlab-org/gitaly/internal/server/auth" - gitalyserver "gitlab.com/gitlab-org/gitaly/internal/service/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" ) -// TestServerSimpleUnaryUnary verifies that the Praefect server is capable of -// routing a specific unary request to and unary response from a backend server -func TestServerSimpleUnaryUnary(t *testing.T) { - testCases := []struct { - name string - - // callback is the actual RPC implementation - callback simpleUnaryUnaryCallback - - // all inputs and outputs for RPC SimpleUnaryUnary - request *mock.SimpleRequest - expectResp *mock.SimpleResponse - expectErrStr string - }{ - { - name: "simple request with response", - callback: callbackIncrement, - request: &mock.SimpleRequest{ - Value: 1, - }, - expectResp: &mock.SimpleResponse{ - Value: 2, +func TestServerRouteServerAccessor(t *testing.T) { + var ( + conf = testConfig(1) + reqQ = make(chan *mock.SimpleRequest) + + expectResp = &mock.SimpleResponse{Value: 2} + + // note: a server scoped RPC will be randomly routed + // to an available backend server. To simplify our + // test, a single backend server is used. + backends = map[int]mock.SimpleServiceServer{ + 0: &mockSvc{ + serverAccessor: func(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { + reqQ <- req + return expectResp, nil + }, }, - }, - } - - gz := proto.FileDescriptor("mock.proto") - fd, err := protoregistry.ExtractFileDescriptor(gz) - if err != nil { - panic(err) - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - const ( - storagePrimary = "default" - ) - - conf := config.Config{ - VirtualStorageName: "praefect", - Nodes: []*models.Node{ - &models.Node{ - ID: 1, - Storage: "praefect-internal-1", - DefaultPrimary: true, - Token: "abc", - }, - &models.Node{ - ID: 2, - Storage: "praefect-internal-2", - Token: "xyz", - }}, - } - - datastore := NewMemoryDatastore(conf) - logEntry := log.Default() - clientCC := conn.NewClientConnections() - coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, fd) - - for id, nodeStorage := range datastore.storageNodes.m { - backend, cleanup := newMockDownstream(t, nodeStorage.Token, tt.callback) - defer cleanup() // clean up mock downstream server resources - - clientCC.RegisterNode(nodeStorage.Storage, backend, nodeStorage.Token) - nodeStorage.Address = backend - datastore.storageNodes.m[id] = nodeStorage - } - - replmgr := NewReplMgr( - storagePrimary, - logEntry, - datastore, - clientCC, - ) - prf := NewServer( - coordinator, - replmgr, - nil, - logEntry, - clientCC, - conf, - ) + } + ) - listener, port := listenAvailPort(t) - t.Logf("proxy listening on port %d", port) - defer listener.Close() + cli, _, cleanup := runPraefectServerWithMock(t, conf, backends) + defer cleanup() - errQ := make(chan error) + expectReq := &mock.SimpleRequest{Value: 1} - go func() { - errQ <- prf.Start(listener) - }() + done := make(chan struct{}) + go func() { + defer close(done) - // dial client to praefect - cc := dialLocalPort(t, port, false) - defer cc.Close() - cli := mock.NewSimpleServiceClient(cc) + actualReq := <-reqQ + assert.True(t, proto.Equal(expectReq, actualReq), + "received unexpected request value: %+v instead of %+v", actualReq, expectReq) + }() - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() - resp, err := cli.SimpleUnaryUnary(ctx, tt.request) - if err != nil { - require.EqualError(t, err, tt.expectErrStr) - } - require.Equal(t, tt.expectResp, resp) + actualResp, err := cli.ServerAccessor(ctx, expectReq) + require.NoError(t, err) + require.True(t, proto.Equal(expectResp, actualResp), + "expected response was not routed back") - err = prf.Shutdown(ctx) - require.NoError(t, err) - require.NoError(t, <-errQ) - }) - } + waitUntil(t, done, time.Second) } func TestGitalyServerInfo(t *testing.T) { @@ -159,7 +81,7 @@ func TestGitalyServerInfo(t *testing.T) { Token: "xyz", }}, } - cc, srv := runFullPraefectServer(t, conf) + cc, srv := runPraefectServerWithGitaly(t, conf) defer srv.s.Stop() client := gitalypb.NewServerServiceClient(cc) @@ -182,7 +104,7 @@ func TestGitalyServerInfo(t *testing.T) { } func TestHealthCheck(t *testing.T) { - cc, srv := runFullPraefectServer(t, config.Config{}) + cc, srv := runPraefectServerWithGitaly(t, config.Config{}) defer srv.s.Stop() ctx, cancel := testhelper.Context() @@ -205,7 +127,7 @@ func TestRejectBadStorage(t *testing.T) { }, } - cc, srv := runFullPraefectServer(t, conf) + cc, srv := runPraefectServerWithGitaly(t, conf) defer srv.s.Stop() badTargetRepo := gitalypb.Repository{ @@ -222,136 +144,3 @@ func TestRejectBadStorage(t *testing.T) { testhelper.RequireGrpcError(t, err, codes.InvalidArgument) require.Equal(t, fmt.Sprintf("only messages for %s are allowed", conf.VirtualStorageName), status.Convert(err).Message()) } - -func runFullPraefectServer(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server) { - datastore := NewMemoryDatastore(conf) - - logEntry := log.Default() - - clientCC := conn.NewClientConnections() - for id, nodeStorage := range datastore.storageNodes.m { - _, backend := runInternalGitalyServer(t, nodeStorage.Token) - - clientCC.RegisterNode(nodeStorage.Storage, backend, nodeStorage.Token) - nodeStorage.Address = backend - datastore.storageNodes.m[id] = nodeStorage - } - - coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) - - replmgr := NewReplMgr( - "", - logEntry, - datastore, - clientCC, - ) - - prf := NewServer( - coordinator, - replmgr, - nil, - logEntry, - clientCC, - conf, - ) - - listener, port := listenAvailPort(t) - t.Logf("proxy listening on port %d", port) - - errQ := make(chan error) - - go func() { - errQ <- prf.Start(listener) - }() - - // dial client to praefect - cc := dialLocalPort(t, port, false) - - return cc, prf -} - -func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string) { - streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor(internalauth.Config{Token: token})} - unaryInt := []grpc.UnaryServerInterceptor{auth.UnaryServerInterceptor(internalauth.Config{Token: token})} - - server := testhelper.NewTestGrpcServer(t, streamInt, unaryInt) - serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() - - listener, err := net.Listen("unix", serverSocketPath) - if err != nil { - t.Fatal(err) - } - - gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) - - go server.Serve(listener) - - return server, "unix://" + serverSocketPath -} - -func callbackIncrement(_ context.Context, req *mock.SimpleRequest) (*mock.SimpleResponse, error) { - return &mock.SimpleResponse{ - Value: req.Value + 1, - }, nil -} - -func listenAvailPort(tb testing.TB) (net.Listener, int) { - listener, err := net.Listen("tcp", ":0") - require.NoError(tb, err) - - return listener, listener.Addr().(*net.TCPAddr).Port -} - -func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { - opts := []grpc.DialOption{ - grpc.WithBlock(), - } - if backend { - opts = append( - opts, - grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), - ) - } - - cc, err := client.Dial( - fmt.Sprintf("tcp://localhost:%d", port), - opts, - ) - require.NoError(tb, err) - - return cc -} - -// initializes and returns a client to downstream server, downstream server, and cleanup function -func newMockDownstream(tb testing.TB, token string, callback simpleUnaryUnaryCallback) (string, func()) { - // setup mock server - m := &mockSvc{ - simpleUnaryUnary: callback, - } - - srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(internalauth.Config{Token: token}))) - - mock.RegisterSimpleServiceServer(srv, m) - - // client to backend service - lis, port := listenAvailPort(tb) - - errQ := make(chan error) - - go func() { - errQ <- srv.Serve(lis) - }() - - cleanup := func() { - srv.GracefulStop() - lis.Close() - - // If the server is shutdown before Serve() is called on it - // the Serve() calls will return the ErrServerStopped - if err := <-errQ; err != nil && err != grpc.ErrServerStopped { - require.NoError(tb, err) - } - } - - return fmt.Sprintf("tcp://localhost:%d", port), cleanup -} |