diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-05-23 13:06:16 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-05-25 12:25:03 +0300 |
commit | 6607788cbd4180d055fa9984fcc2fe45e2fea8cd (patch) | |
tree | 1fe9306855408ed342839fe7badb85ff13ddf47f | |
parent | 829899bb24ac79557771e92ecf82f1e16daf3a0a (diff) |
gRPC proto registry can be lock free
The Registry of proto files free of locks as it
fully initialized by constructor before usage.
Creation of Registry for each test makes no sense
and to make them more consistent the global
protoregistry.GitalyProtoPreregistered is used.
-rw-r--r-- | cmd/praefect/main.go | 9 | ||||
-rw-r--r-- | internal/middleware/cache/cache_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 13 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/protoregistry/find_oid_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry.go | 31 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 6 |
9 files changed, 31 insertions, 74 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 322ae28b5..3c8ec20a5 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -259,14 +259,9 @@ func run(cfgs []starter.Config, conf config.Config) error { transactions.WithDelayMetric(transactionDelayMetric), ) - registry := protoregistry.New() - if err = registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...); err != nil { - return err - } - var ( // top level server dependencies - coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, registry) + coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered) repl = praefect.NewReplMgr( logger, ds, @@ -274,7 +269,7 @@ func run(cfgs []starter.Config, conf config.Config) error { praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) - srv = praefect.NewServer(coordinator.StreamDirector, logger, registry, conf) + srv = praefect.NewServer(coordinator.StreamDirector, logger, protoregistry.GitalyProtoPreregistered, conf) ) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go index 9bcb2343b..632dc3843 100644 --- a/internal/middleware/cache/cache_test.go +++ b/internal/middleware/cache/cache_test.go @@ -26,8 +26,8 @@ import ( func TestInvalidators(t *testing.T) { mCache := newMockCache() - reg := protoregistry.New() - require.NoError(t, reg.RegisterFiles(streamFileDesc(t))) + reg, err := protoregistry.New(streamFileDesc(t)) + require.NoError(t, err) srvr := grpc.NewServer( grpc.StreamInterceptor( diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index c64c9a468..5146d10b2 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -179,8 +179,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func txMgr := transactions.NewManager() - registry := protoregistry.New() - require.NoError(t, registry.RegisterFiles(fd)) + registry, err := protoregistry.New(fd) + require.NoError(t, err) coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index b100b2ba4..1b7103ff0 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -106,8 +106,6 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { }, }, } - pbRegistry := protoregistry.New() - require.NoError(t, pbRegistry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) const storageName = "test-storage" coordinator := NewCoordinator( @@ -121,7 +119,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { }}, transactions.NewManager(), conf, - pbRegistry, + protoregistry.GitalyProtoPreregistered, ) ctx, cancel := testhelper.Context() @@ -195,11 +193,10 @@ func TestStreamDirectorMutator(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) + txMgr := transactions.NewManager() - coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r) + coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ Origin: &targetRepo, @@ -306,12 +303,10 @@ func TestStreamDirectorAccessor(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) txMgr := transactions.NewManager() - coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r) + coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) require.NoError(t, err) diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index a38fabc09..d8cadf610 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -91,8 +91,8 @@ func assertPrimariesExist(t testing.TB, conf config.Config) { // configured storage node. // requires there to be only 1 virtual storage func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t))) + r, err := protoregistry.New(mustLoadProtoReg(t)) + require.NoError(t, err) return runPraefectServer(t, conf, buildOptions{ withDatastore: ds, @@ -220,12 +220,6 @@ func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) no return nodeMgr } -func defaultAnnotations(t testing.TB) *protoregistry.Registry { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - return r -} - func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *Server, testhelper.Cleanup) { assertPrimariesExist(t, conf) @@ -241,7 +235,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...) } if opt.withAnnotations == nil { - opt.withAnnotations = defaultAnnotations(t) + opt.withAnnotations = protoregistry.GitalyProtoPreregistered } if opt.withLogger == nil { opt.withLogger = log.Default() diff --git a/internal/praefect/protoregistry/find_oid_test.go b/internal/praefect/protoregistry/find_oid_test.go index 6c6a837eb..4a87db9c7 100644 --- a/internal/praefect/protoregistry/find_oid_test.go +++ b/internal/praefect/protoregistry/find_oid_test.go @@ -12,9 +12,6 @@ import ( ) func TestProtoRegistryTargetRepo(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - testRepos := []*gitalypb.Repository{ &gitalypb.Repository{ GitAlternateObjectDirectories: []string{"a", "b", "c"}, @@ -96,7 +93,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) { for _, tc := range testcases { desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc) t.Run(desc, func(t *testing.T) { - info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) + info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) require.NoError(t, err) actualTarget, actualErr := info.TargetRepo(tc.pbMsg) @@ -119,9 +116,6 @@ func TestProtoRegistryTargetRepo(t *testing.T) { } func TestProtoRegistryStorage(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - testcases := []struct { desc string svc string @@ -151,7 +145,7 @@ func TestProtoRegistryStorage(t *testing.T) { for _, tc := range testcases { desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc) t.Run(desc, func(t *testing.T) { - info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) + info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) require.NoError(t, err) actualStorage, actualErr := info.Storage(tc.pbMsg) diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 5ffb6a884..f7ebe6f7e 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "reflect" "strings" - "sync" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -35,8 +34,9 @@ func init() { GitalyProtoFileDescriptors = append(GitalyProtoFileDescriptors, fd) } - GitalyProtoPreregistered = New() - if err := GitalyProtoPreregistered.RegisterFiles(GitalyProtoFileDescriptors...); err != nil { + var err error + GitalyProtoPreregistered, err = New(GitalyProtoFileDescriptors...) + if err != nil { panic(err) } } @@ -145,41 +145,31 @@ func (mi MethodInfo) UnmarshalRequestProto(b []byte) (proto.Message, error) { // Registry contains info about RPC methods type Registry struct { - sync.RWMutex protos map[string]MethodInfo } -// New creates a new ProtoRegistry -func New() *Registry { - return &Registry{ - protos: make(map[string]MethodInfo), - } -} - -// RegisterFiles takes one or more descriptor.FileDescriptorProto and populates -// the registry with its info -func (pr *Registry) RegisterFiles(protos ...*descriptor.FileDescriptorProto) error { - pr.Lock() - defer pr.Unlock() +// New creates a new ProtoRegistry with info from one or more descriptor.FileDescriptorProto +func New(protos ...*descriptor.FileDescriptorProto) (*Registry, error) { + methods := make(map[string]MethodInfo) for _, p := range protos { for _, svc := range p.GetService() { for _, method := range svc.GetMethod() { mi, err := parseMethodInfo(p, method) if err != nil { - return err + return nil, err } fullMethodName := fmt.Sprintf( "/%s.%s/%s", p.GetPackage(), svc.GetName(), method.GetName(), ) - pr.protos[fullMethodName] = mi + methods[fullMethodName] = mi } } } - return nil + return &Registry{protos: methods}, nil } func getOpExtension(m *descriptor.MethodDescriptorProto) (*gitalypb.OperationMsg, error) { @@ -473,9 +463,6 @@ func lastName(inputType string) (string, error) { // LookupMethod looks up an MethodInfo by service and method name func (pr *Registry) LookupMethod(fullMethodName string) (MethodInfo, error) { - pr.RLock() - defer pr.RUnlock() - methodInfo, ok := pr.protos[fullMethodName] if !ok { return MethodInfo{}, fmt.Errorf("full method name not found: %v", fullMethodName) diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go index 4afc206b8..3f898211e 100644 --- a/internal/praefect/protoregistry/protoregistry_test.go +++ b/internal/praefect/protoregistry/protoregistry_test.go @@ -10,9 +10,9 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -func TestPopulatesProtoRegistry(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) +func TestNewProtoRegistry(t *testing.T) { + r, err := protoregistry.New(protoregistry.GitalyProtoFileDescriptors...) + require.NoError(t, err) expectedResults := map[string]map[string]protoregistry.OpType{ "BlobService": map[string]protoregistry.OpType{ @@ -186,10 +186,7 @@ func TestPopulatesProtoRegistry(t *testing.T) { } func TestRequestFactory(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - - mInfo, err := r.LookupMethod("/gitaly.RepositoryService/RepositoryExists") + mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/RepositoryExists") require.NoError(t, err) pb, err := mInfo.UnmarshalRequestProto([]byte{}) @@ -199,9 +196,6 @@ func TestRequestFactory(t *testing.T) { } func TestMethodInfoScope(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - for _, tt := range []struct { method string scope protoregistry.Scope @@ -216,7 +210,7 @@ func TestMethodInfoScope(t *testing.T) { }, } { t.Run(tt.method, func(t *testing.T) { - mInfo, err := r.LookupMethod(tt.method) + mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(tt.method) require.NoError(t, err) require.Exactly(t, tt.scope, mInfo.Scope) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 378d98628..51772ede5 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -227,16 +227,14 @@ func TestPropagateReplicationJob(t *testing.T) { txMgr := transactions.NewManager() - registry := protoregistry.New() - require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry) + coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) replmgr := NewReplMgr(logEntry, ds, nodeMgr) prf := NewServer( coordinator.StreamDirector, logEntry, - registry, + protoregistry.GitalyProtoPreregistered, conf, ) listener, port := listenAvailPort(t) |