Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-05-23 13:06:16 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-05-25 12:25:03 +0300
commit6607788cbd4180d055fa9984fcc2fe45e2fea8cd (patch)
tree1fe9306855408ed342839fe7badb85ff13ddf47f
parent829899bb24ac79557771e92ecf82f1e16daf3a0a (diff)
gRPC proto registry can be lock free
The Registry of proto files free of locks as it fully initialized by constructor before usage. Creation of Registry for each test makes no sense and to make them more consistent the global protoregistry.GitalyProtoPreregistered is used.
-rw-r--r--cmd/praefect/main.go9
-rw-r--r--internal/middleware/cache/cache_test.go4
-rw-r--r--internal/praefect/auth_test.go4
-rw-r--r--internal/praefect/coordinator_test.go13
-rw-r--r--internal/praefect/helper_test.go12
-rw-r--r--internal/praefect/protoregistry/find_oid_test.go10
-rw-r--r--internal/praefect/protoregistry/protoregistry.go31
-rw-r--r--internal/praefect/protoregistry/protoregistry_test.go16
-rw-r--r--internal/praefect/replicator_test.go6
9 files changed, 31 insertions, 74 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 322ae28b5..3c8ec20a5 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -259,14 +259,9 @@ func run(cfgs []starter.Config, conf config.Config) error {
transactions.WithDelayMetric(transactionDelayMetric),
)
- registry := protoregistry.New()
- if err = registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...); err != nil {
- return err
- }
-
var (
// top level server dependencies
- coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, registry)
+ coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
repl = praefect.NewReplMgr(
logger,
ds,
@@ -274,7 +269,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
praefect.WithQueueMetric(queueMetric))
- srv = praefect.NewServer(coordinator.StreamDirector, logger, registry, conf)
+ srv = praefect.NewServer(coordinator.StreamDirector, logger, protoregistry.GitalyProtoPreregistered, conf)
)
ctx, cancel := context.WithCancel(context.Background())
diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go
index 9bcb2343b..632dc3843 100644
--- a/internal/middleware/cache/cache_test.go
+++ b/internal/middleware/cache/cache_test.go
@@ -26,8 +26,8 @@ import (
func TestInvalidators(t *testing.T) {
mCache := newMockCache()
- reg := protoregistry.New()
- require.NoError(t, reg.RegisterFiles(streamFileDesc(t)))
+ reg, err := protoregistry.New(streamFileDesc(t))
+ require.NoError(t, err)
srvr := grpc.NewServer(
grpc.StreamInterceptor(
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index c64c9a468..5146d10b2 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -179,8 +179,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
txMgr := transactions.NewManager()
- registry := protoregistry.New()
- require.NoError(t, registry.RegisterFiles(fd))
+ registry, err := protoregistry.New(fd)
+ require.NoError(t, err)
coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index b100b2ba4..1b7103ff0 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -106,8 +106,6 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
},
},
}
- pbRegistry := protoregistry.New()
- require.NoError(t, pbRegistry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
const storageName = "test-storage"
coordinator := NewCoordinator(
@@ -121,7 +119,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
}},
transactions.NewManager(),
conf,
- pbRegistry,
+ protoregistry.GitalyProtoPreregistered,
)
ctx, cancel := testhelper.Context()
@@ -195,11 +193,10 @@ func TestStreamDirectorMutator(t *testing.T) {
nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
+
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r)
+ coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
@@ -306,12 +303,10 @@ func TestStreamDirectorAccessor(t *testing.T) {
nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r)
+ coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
require.NoError(t, err)
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index a38fabc09..d8cadf610 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -91,8 +91,8 @@ func assertPrimariesExist(t testing.TB, conf config.Config) {
// configured storage node.
// requires there to be only 1 virtual storage
func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t)))
+ r, err := protoregistry.New(mustLoadProtoReg(t))
+ require.NoError(t, err)
return runPraefectServer(t, conf, buildOptions{
withDatastore: ds,
@@ -220,12 +220,6 @@ func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) no
return nodeMgr
}
-func defaultAnnotations(t testing.TB) *protoregistry.Registry {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
- return r
-}
-
func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
assertPrimariesExist(t, conf)
@@ -241,7 +235,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...)
}
if opt.withAnnotations == nil {
- opt.withAnnotations = defaultAnnotations(t)
+ opt.withAnnotations = protoregistry.GitalyProtoPreregistered
}
if opt.withLogger == nil {
opt.withLogger = log.Default()
diff --git a/internal/praefect/protoregistry/find_oid_test.go b/internal/praefect/protoregistry/find_oid_test.go
index 6c6a837eb..4a87db9c7 100644
--- a/internal/praefect/protoregistry/find_oid_test.go
+++ b/internal/praefect/protoregistry/find_oid_test.go
@@ -12,9 +12,6 @@ import (
)
func TestProtoRegistryTargetRepo(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
testRepos := []*gitalypb.Repository{
&gitalypb.Repository{
GitAlternateObjectDirectories: []string{"a", "b", "c"},
@@ -96,7 +93,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) {
for _, tc := range testcases {
desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc)
t.Run(desc, func(t *testing.T) {
- info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
+ info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
require.NoError(t, err)
actualTarget, actualErr := info.TargetRepo(tc.pbMsg)
@@ -119,9 +116,6 @@ func TestProtoRegistryTargetRepo(t *testing.T) {
}
func TestProtoRegistryStorage(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
testcases := []struct {
desc string
svc string
@@ -151,7 +145,7 @@ func TestProtoRegistryStorage(t *testing.T) {
for _, tc := range testcases {
desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc)
t.Run(desc, func(t *testing.T) {
- info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
+ info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
require.NoError(t, err)
actualStorage, actualErr := info.Storage(tc.pbMsg)
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 5ffb6a884..f7ebe6f7e 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -7,7 +7,6 @@ import (
"io/ioutil"
"reflect"
"strings"
- "sync"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
@@ -35,8 +34,9 @@ func init() {
GitalyProtoFileDescriptors = append(GitalyProtoFileDescriptors, fd)
}
- GitalyProtoPreregistered = New()
- if err := GitalyProtoPreregistered.RegisterFiles(GitalyProtoFileDescriptors...); err != nil {
+ var err error
+ GitalyProtoPreregistered, err = New(GitalyProtoFileDescriptors...)
+ if err != nil {
panic(err)
}
}
@@ -145,41 +145,31 @@ func (mi MethodInfo) UnmarshalRequestProto(b []byte) (proto.Message, error) {
// Registry contains info about RPC methods
type Registry struct {
- sync.RWMutex
protos map[string]MethodInfo
}
-// New creates a new ProtoRegistry
-func New() *Registry {
- return &Registry{
- protos: make(map[string]MethodInfo),
- }
-}
-
-// RegisterFiles takes one or more descriptor.FileDescriptorProto and populates
-// the registry with its info
-func (pr *Registry) RegisterFiles(protos ...*descriptor.FileDescriptorProto) error {
- pr.Lock()
- defer pr.Unlock()
+// New creates a new ProtoRegistry with info from one or more descriptor.FileDescriptorProto
+func New(protos ...*descriptor.FileDescriptorProto) (*Registry, error) {
+ methods := make(map[string]MethodInfo)
for _, p := range protos {
for _, svc := range p.GetService() {
for _, method := range svc.GetMethod() {
mi, err := parseMethodInfo(p, method)
if err != nil {
- return err
+ return nil, err
}
fullMethodName := fmt.Sprintf(
"/%s.%s/%s",
p.GetPackage(), svc.GetName(), method.GetName(),
)
- pr.protos[fullMethodName] = mi
+ methods[fullMethodName] = mi
}
}
}
- return nil
+ return &Registry{protos: methods}, nil
}
func getOpExtension(m *descriptor.MethodDescriptorProto) (*gitalypb.OperationMsg, error) {
@@ -473,9 +463,6 @@ func lastName(inputType string) (string, error) {
// LookupMethod looks up an MethodInfo by service and method name
func (pr *Registry) LookupMethod(fullMethodName string) (MethodInfo, error) {
- pr.RLock()
- defer pr.RUnlock()
-
methodInfo, ok := pr.protos[fullMethodName]
if !ok {
return MethodInfo{}, fmt.Errorf("full method name not found: %v", fullMethodName)
diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go
index 4afc206b8..3f898211e 100644
--- a/internal/praefect/protoregistry/protoregistry_test.go
+++ b/internal/praefect/protoregistry/protoregistry_test.go
@@ -10,9 +10,9 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-func TestPopulatesProtoRegistry(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
+func TestNewProtoRegistry(t *testing.T) {
+ r, err := protoregistry.New(protoregistry.GitalyProtoFileDescriptors...)
+ require.NoError(t, err)
expectedResults := map[string]map[string]protoregistry.OpType{
"BlobService": map[string]protoregistry.OpType{
@@ -186,10 +186,7 @@ func TestPopulatesProtoRegistry(t *testing.T) {
}
func TestRequestFactory(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
- mInfo, err := r.LookupMethod("/gitaly.RepositoryService/RepositoryExists")
+ mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/RepositoryExists")
require.NoError(t, err)
pb, err := mInfo.UnmarshalRequestProto([]byte{})
@@ -199,9 +196,6 @@ func TestRequestFactory(t *testing.T) {
}
func TestMethodInfoScope(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
for _, tt := range []struct {
method string
scope protoregistry.Scope
@@ -216,7 +210,7 @@ func TestMethodInfoScope(t *testing.T) {
},
} {
t.Run(tt.method, func(t *testing.T) {
- mInfo, err := r.LookupMethod(tt.method)
+ mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(tt.method)
require.NoError(t, err)
require.Exactly(t, tt.scope, mInfo.Scope)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 378d98628..51772ede5 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -227,16 +227,14 @@ func TestPropagateReplicationJob(t *testing.T) {
txMgr := transactions.NewManager()
- registry := protoregistry.New()
- require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
- coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry)
+ coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
replmgr := NewReplMgr(logEntry, ds, nodeMgr)
prf := NewServer(
coordinator.StreamDirector,
logEntry,
- registry,
+ protoregistry.GitalyProtoPreregistered,
conf,
)
listener, port := listenAvailPort(t)