diff options
author | John Cai <jcai@gitlab.com> | 2020-05-26 02:53:11 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-05-26 02:53:11 +0300 |
commit | e3d5d49d77b1a1b5b2cda1f910711599c9454500 (patch) | |
tree | 08670ed866dad1229037fbaf44f5e7319f1b583a | |
parent | 715f33073b49b9ed5fe0e90598954eef7ef318ca (diff) | |
parent | 6607788cbd4180d055fa9984fcc2fe45e2fea8cd (diff) |
Merge branch 'ps-protoreg-lock-free' into 'master'
gRPC proto registry can be lock free
See merge request gitlab-org/gitaly!2209
-rw-r--r-- | cmd/praefect/main.go | 9 | ||||
-rw-r--r-- | internal/middleware/cache/cache_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 13 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/protoregistry/find_oid_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry.go | 31 | ||||
-rw-r--r-- | internal/praefect/protoregistry/protoregistry_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 6 |
9 files changed, 31 insertions, 74 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 322ae28b5..3c8ec20a5 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -259,14 +259,9 @@ func run(cfgs []starter.Config, conf config.Config) error { transactions.WithDelayMetric(transactionDelayMetric), ) - registry := protoregistry.New() - if err = registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...); err != nil { - return err - } - var ( // top level server dependencies - coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, registry) + coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered) repl = praefect.NewReplMgr( logger, ds, @@ -274,7 +269,7 @@ func run(cfgs []starter.Config, conf config.Config) error { praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) - srv = praefect.NewServer(coordinator.StreamDirector, logger, registry, conf) + srv = praefect.NewServer(coordinator.StreamDirector, logger, protoregistry.GitalyProtoPreregistered, conf) ) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go index 9bcb2343b..632dc3843 100644 --- a/internal/middleware/cache/cache_test.go +++ b/internal/middleware/cache/cache_test.go @@ -26,8 +26,8 @@ import ( func TestInvalidators(t *testing.T) { mCache := newMockCache() - reg := protoregistry.New() - require.NoError(t, reg.RegisterFiles(streamFileDesc(t))) + reg, err := protoregistry.New(streamFileDesc(t)) + require.NoError(t, err) srvr := grpc.NewServer( grpc.StreamInterceptor( diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index c64c9a468..5146d10b2 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -179,8 +179,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func txMgr := transactions.NewManager() - registry := protoregistry.New() - require.NoError(t, registry.RegisterFiles(fd)) + registry, err := protoregistry.New(fd) + require.NoError(t, err) coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index b100b2ba4..1b7103ff0 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -106,8 +106,6 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { }, }, } - pbRegistry := protoregistry.New() - require.NoError(t, pbRegistry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) const storageName = "test-storage" coordinator := NewCoordinator( @@ -121,7 +119,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { }}, transactions.NewManager(), conf, - pbRegistry, + protoregistry.GitalyProtoPreregistered, ) ctx, cancel := testhelper.Context() @@ -195,11 +193,10 @@ func TestStreamDirectorMutator(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) + txMgr := transactions.NewManager() - coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r) + coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ Origin: &targetRepo, @@ -306,12 +303,10 @@ func TestStreamDirectorAccessor(t *testing.T) { nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) txMgr := transactions.NewManager() - coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r) + coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) require.NoError(t, err) diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index a38fabc09..d8cadf610 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -91,8 +91,8 @@ func assertPrimariesExist(t testing.TB, conf config.Config) { // configured storage node. // requires there to be only 1 virtual storage func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t))) + r, err := protoregistry.New(mustLoadProtoReg(t)) + require.NoError(t, err) return runPraefectServer(t, conf, buildOptions{ withDatastore: ds, @@ -220,12 +220,6 @@ func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) no return nodeMgr } -func defaultAnnotations(t testing.TB) *protoregistry.Registry { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - return r -} - func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *Server, testhelper.Cleanup) { assertPrimariesExist(t, conf) @@ -241,7 +235,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...) } if opt.withAnnotations == nil { - opt.withAnnotations = defaultAnnotations(t) + opt.withAnnotations = protoregistry.GitalyProtoPreregistered } if opt.withLogger == nil { opt.withLogger = log.Default() diff --git a/internal/praefect/protoregistry/find_oid_test.go b/internal/praefect/protoregistry/find_oid_test.go index 6c6a837eb..4a87db9c7 100644 --- a/internal/praefect/protoregistry/find_oid_test.go +++ b/internal/praefect/protoregistry/find_oid_test.go @@ -12,9 +12,6 @@ import ( ) func TestProtoRegistryTargetRepo(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - testRepos := []*gitalypb.Repository{ &gitalypb.Repository{ GitAlternateObjectDirectories: []string{"a", "b", "c"}, @@ -96,7 +93,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) { for _, tc := range testcases { desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc) t.Run(desc, func(t *testing.T) { - info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) + info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) require.NoError(t, err) actualTarget, actualErr := info.TargetRepo(tc.pbMsg) @@ -119,9 +116,6 @@ func TestProtoRegistryTargetRepo(t *testing.T) { } func TestProtoRegistryStorage(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - testcases := []struct { desc string svc string @@ -151,7 +145,7 @@ func TestProtoRegistryStorage(t *testing.T) { for _, tc := range testcases { desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc) t.Run(desc, func(t *testing.T) { - info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) + info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method)) require.NoError(t, err) actualStorage, actualErr := info.Storage(tc.pbMsg) diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 5ffb6a884..f7ebe6f7e 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -7,7 +7,6 @@ import ( "io/ioutil" "reflect" "strings" - "sync" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -35,8 +34,9 @@ func init() { GitalyProtoFileDescriptors = append(GitalyProtoFileDescriptors, fd) } - GitalyProtoPreregistered = New() - if err := GitalyProtoPreregistered.RegisterFiles(GitalyProtoFileDescriptors...); err != nil { + var err error + GitalyProtoPreregistered, err = New(GitalyProtoFileDescriptors...) + if err != nil { panic(err) } } @@ -145,41 +145,31 @@ func (mi MethodInfo) UnmarshalRequestProto(b []byte) (proto.Message, error) { // Registry contains info about RPC methods type Registry struct { - sync.RWMutex protos map[string]MethodInfo } -// New creates a new ProtoRegistry -func New() *Registry { - return &Registry{ - protos: make(map[string]MethodInfo), - } -} - -// RegisterFiles takes one or more descriptor.FileDescriptorProto and populates -// the registry with its info -func (pr *Registry) RegisterFiles(protos ...*descriptor.FileDescriptorProto) error { - pr.Lock() - defer pr.Unlock() +// New creates a new ProtoRegistry with info from one or more descriptor.FileDescriptorProto +func New(protos ...*descriptor.FileDescriptorProto) (*Registry, error) { + methods := make(map[string]MethodInfo) for _, p := range protos { for _, svc := range p.GetService() { for _, method := range svc.GetMethod() { mi, err := parseMethodInfo(p, method) if err != nil { - return err + return nil, err } fullMethodName := fmt.Sprintf( "/%s.%s/%s", p.GetPackage(), svc.GetName(), method.GetName(), ) - pr.protos[fullMethodName] = mi + methods[fullMethodName] = mi } } } - return nil + return &Registry{protos: methods}, nil } func getOpExtension(m *descriptor.MethodDescriptorProto) (*gitalypb.OperationMsg, error) { @@ -473,9 +463,6 @@ func lastName(inputType string) (string, error) { // LookupMethod looks up an MethodInfo by service and method name func (pr *Registry) LookupMethod(fullMethodName string) (MethodInfo, error) { - pr.RLock() - defer pr.RUnlock() - methodInfo, ok := pr.protos[fullMethodName] if !ok { return MethodInfo{}, fmt.Errorf("full method name not found: %v", fullMethodName) diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go index 4afc206b8..3f898211e 100644 --- a/internal/praefect/protoregistry/protoregistry_test.go +++ b/internal/praefect/protoregistry/protoregistry_test.go @@ -10,9 +10,9 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -func TestPopulatesProtoRegistry(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) +func TestNewProtoRegistry(t *testing.T) { + r, err := protoregistry.New(protoregistry.GitalyProtoFileDescriptors...) + require.NoError(t, err) expectedResults := map[string]map[string]protoregistry.OpType{ "BlobService": map[string]protoregistry.OpType{ @@ -186,10 +186,7 @@ func TestPopulatesProtoRegistry(t *testing.T) { } func TestRequestFactory(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - - mInfo, err := r.LookupMethod("/gitaly.RepositoryService/RepositoryExists") + mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/RepositoryExists") require.NoError(t, err) pb, err := mInfo.UnmarshalRequestProto([]byte{}) @@ -199,9 +196,6 @@ func TestRequestFactory(t *testing.T) { } func TestMethodInfoScope(t *testing.T) { - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - for _, tt := range []struct { method string scope protoregistry.Scope @@ -216,7 +210,7 @@ func TestMethodInfoScope(t *testing.T) { }, } { t.Run(tt.method, func(t *testing.T) { - mInfo, err := r.LookupMethod(tt.method) + mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(tt.method) require.NoError(t, err) require.Exactly(t, tt.scope, mInfo.Scope) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 378d98628..51772ede5 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -227,16 +227,14 @@ func TestPropagateReplicationJob(t *testing.T) { txMgr := transactions.NewManager() - registry := protoregistry.New() - require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry) + coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) replmgr := NewReplMgr(logEntry, ds, nodeMgr) prf := NewServer( coordinator.StreamDirector, logEntry, - registry, + protoregistry.GitalyProtoPreregistered, conf, ) listener, port := listenAvailPort(t) |