Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-05-26 02:53:11 +0300
committerJohn Cai <jcai@gitlab.com>2020-05-26 02:53:11 +0300
commite3d5d49d77b1a1b5b2cda1f910711599c9454500 (patch)
tree08670ed866dad1229037fbaf44f5e7319f1b583a
parent715f33073b49b9ed5fe0e90598954eef7ef318ca (diff)
parent6607788cbd4180d055fa9984fcc2fe45e2fea8cd (diff)
Merge branch 'ps-protoreg-lock-free' into 'master'
gRPC proto registry can be lock free See merge request gitlab-org/gitaly!2209
-rw-r--r--cmd/praefect/main.go9
-rw-r--r--internal/middleware/cache/cache_test.go4
-rw-r--r--internal/praefect/auth_test.go4
-rw-r--r--internal/praefect/coordinator_test.go13
-rw-r--r--internal/praefect/helper_test.go12
-rw-r--r--internal/praefect/protoregistry/find_oid_test.go10
-rw-r--r--internal/praefect/protoregistry/protoregistry.go31
-rw-r--r--internal/praefect/protoregistry/protoregistry_test.go16
-rw-r--r--internal/praefect/replicator_test.go6
9 files changed, 31 insertions, 74 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 322ae28b5..3c8ec20a5 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -259,14 +259,9 @@ func run(cfgs []starter.Config, conf config.Config) error {
transactions.WithDelayMetric(transactionDelayMetric),
)
- registry := protoregistry.New()
- if err = registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...); err != nil {
- return err
- }
-
var (
// top level server dependencies
- coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, registry)
+ coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
repl = praefect.NewReplMgr(
logger,
ds,
@@ -274,7 +269,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
praefect.WithQueueMetric(queueMetric))
- srv = praefect.NewServer(coordinator.StreamDirector, logger, registry, conf)
+ srv = praefect.NewServer(coordinator.StreamDirector, logger, protoregistry.GitalyProtoPreregistered, conf)
)
ctx, cancel := context.WithCancel(context.Background())
diff --git a/internal/middleware/cache/cache_test.go b/internal/middleware/cache/cache_test.go
index 9bcb2343b..632dc3843 100644
--- a/internal/middleware/cache/cache_test.go
+++ b/internal/middleware/cache/cache_test.go
@@ -26,8 +26,8 @@ import (
func TestInvalidators(t *testing.T) {
mCache := newMockCache()
- reg := protoregistry.New()
- require.NoError(t, reg.RegisterFiles(streamFileDesc(t)))
+ reg, err := protoregistry.New(streamFileDesc(t))
+ require.NoError(t, err)
srvr := grpc.NewServer(
grpc.StreamInterceptor(
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index c64c9a468..5146d10b2 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -179,8 +179,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
txMgr := transactions.NewManager()
- registry := protoregistry.New()
- require.NoError(t, registry.RegisterFiles(fd))
+ registry, err := protoregistry.New(fd)
+ require.NoError(t, err)
coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index b100b2ba4..1b7103ff0 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -106,8 +106,6 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
},
},
}
- pbRegistry := protoregistry.New()
- require.NoError(t, pbRegistry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
const storageName = "test-storage"
coordinator := NewCoordinator(
@@ -121,7 +119,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
}},
transactions.NewManager(),
conf,
- pbRegistry,
+ protoregistry.GitalyProtoPreregistered,
)
ctx, cancel := testhelper.Context()
@@ -195,11 +193,10 @@ func TestStreamDirectorMutator(t *testing.T) {
nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
+
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r)
+ coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
@@ -306,12 +303,10 @@ func TestStreamDirectorAccessor(t *testing.T) {
nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, r)
+ coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
require.NoError(t, err)
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index a38fabc09..d8cadf610 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -91,8 +91,8 @@ func assertPrimariesExist(t testing.TB, conf config.Config) {
// configured storage node.
// requires there to be only 1 virtual storage
func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t)))
+ r, err := protoregistry.New(mustLoadProtoReg(t))
+ require.NoError(t, err)
return runPraefectServer(t, conf, buildOptions{
withDatastore: ds,
@@ -220,12 +220,6 @@ func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) no
return nodeMgr
}
-func defaultAnnotations(t testing.TB) *protoregistry.Registry {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
- return r
-}
-
func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
assertPrimariesExist(t, conf)
@@ -241,7 +235,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...)
}
if opt.withAnnotations == nil {
- opt.withAnnotations = defaultAnnotations(t)
+ opt.withAnnotations = protoregistry.GitalyProtoPreregistered
}
if opt.withLogger == nil {
opt.withLogger = log.Default()
diff --git a/internal/praefect/protoregistry/find_oid_test.go b/internal/praefect/protoregistry/find_oid_test.go
index 6c6a837eb..4a87db9c7 100644
--- a/internal/praefect/protoregistry/find_oid_test.go
+++ b/internal/praefect/protoregistry/find_oid_test.go
@@ -12,9 +12,6 @@ import (
)
func TestProtoRegistryTargetRepo(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
testRepos := []*gitalypb.Repository{
&gitalypb.Repository{
GitAlternateObjectDirectories: []string{"a", "b", "c"},
@@ -96,7 +93,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) {
for _, tc := range testcases {
desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc)
t.Run(desc, func(t *testing.T) {
- info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
+ info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
require.NoError(t, err)
actualTarget, actualErr := info.TargetRepo(tc.pbMsg)
@@ -119,9 +116,6 @@ func TestProtoRegistryTargetRepo(t *testing.T) {
}
func TestProtoRegistryStorage(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
testcases := []struct {
desc string
svc string
@@ -151,7 +145,7 @@ func TestProtoRegistryStorage(t *testing.T) {
for _, tc := range testcases {
desc := fmt.Sprintf("%s:%s %s", tc.svc, tc.method, tc.desc)
t.Run(desc, func(t *testing.T) {
- info, err := r.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
+ info, err := protoregistry.GitalyProtoPreregistered.LookupMethod(fmt.Sprintf("/gitaly.%s/%s", tc.svc, tc.method))
require.NoError(t, err)
actualStorage, actualErr := info.Storage(tc.pbMsg)
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 5ffb6a884..f7ebe6f7e 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -7,7 +7,6 @@ import (
"io/ioutil"
"reflect"
"strings"
- "sync"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
@@ -35,8 +34,9 @@ func init() {
GitalyProtoFileDescriptors = append(GitalyProtoFileDescriptors, fd)
}
- GitalyProtoPreregistered = New()
- if err := GitalyProtoPreregistered.RegisterFiles(GitalyProtoFileDescriptors...); err != nil {
+ var err error
+ GitalyProtoPreregistered, err = New(GitalyProtoFileDescriptors...)
+ if err != nil {
panic(err)
}
}
@@ -145,41 +145,31 @@ func (mi MethodInfo) UnmarshalRequestProto(b []byte) (proto.Message, error) {
// Registry contains info about RPC methods
type Registry struct {
- sync.RWMutex
protos map[string]MethodInfo
}
-// New creates a new ProtoRegistry
-func New() *Registry {
- return &Registry{
- protos: make(map[string]MethodInfo),
- }
-}
-
-// RegisterFiles takes one or more descriptor.FileDescriptorProto and populates
-// the registry with its info
-func (pr *Registry) RegisterFiles(protos ...*descriptor.FileDescriptorProto) error {
- pr.Lock()
- defer pr.Unlock()
+// New creates a new ProtoRegistry with info from one or more descriptor.FileDescriptorProto
+func New(protos ...*descriptor.FileDescriptorProto) (*Registry, error) {
+ methods := make(map[string]MethodInfo)
for _, p := range protos {
for _, svc := range p.GetService() {
for _, method := range svc.GetMethod() {
mi, err := parseMethodInfo(p, method)
if err != nil {
- return err
+ return nil, err
}
fullMethodName := fmt.Sprintf(
"/%s.%s/%s",
p.GetPackage(), svc.GetName(), method.GetName(),
)
- pr.protos[fullMethodName] = mi
+ methods[fullMethodName] = mi
}
}
}
- return nil
+ return &Registry{protos: methods}, nil
}
func getOpExtension(m *descriptor.MethodDescriptorProto) (*gitalypb.OperationMsg, error) {
@@ -473,9 +463,6 @@ func lastName(inputType string) (string, error) {
// LookupMethod looks up an MethodInfo by service and method name
func (pr *Registry) LookupMethod(fullMethodName string) (MethodInfo, error) {
- pr.RLock()
- defer pr.RUnlock()
-
methodInfo, ok := pr.protos[fullMethodName]
if !ok {
return MethodInfo{}, fmt.Errorf("full method name not found: %v", fullMethodName)
diff --git a/internal/praefect/protoregistry/protoregistry_test.go b/internal/praefect/protoregistry/protoregistry_test.go
index 4afc206b8..3f898211e 100644
--- a/internal/praefect/protoregistry/protoregistry_test.go
+++ b/internal/praefect/protoregistry/protoregistry_test.go
@@ -10,9 +10,9 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-func TestPopulatesProtoRegistry(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
+func TestNewProtoRegistry(t *testing.T) {
+ r, err := protoregistry.New(protoregistry.GitalyProtoFileDescriptors...)
+ require.NoError(t, err)
expectedResults := map[string]map[string]protoregistry.OpType{
"BlobService": map[string]protoregistry.OpType{
@@ -186,10 +186,7 @@ func TestPopulatesProtoRegistry(t *testing.T) {
}
func TestRequestFactory(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
- mInfo, err := r.LookupMethod("/gitaly.RepositoryService/RepositoryExists")
+ mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod("/gitaly.RepositoryService/RepositoryExists")
require.NoError(t, err)
pb, err := mInfo.UnmarshalRequestProto([]byte{})
@@ -199,9 +196,6 @@ func TestRequestFactory(t *testing.T) {
}
func TestMethodInfoScope(t *testing.T) {
- r := protoregistry.New()
- require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
-
for _, tt := range []struct {
method string
scope protoregistry.Scope
@@ -216,7 +210,7 @@ func TestMethodInfoScope(t *testing.T) {
},
} {
t.Run(tt.method, func(t *testing.T) {
- mInfo, err := r.LookupMethod(tt.method)
+ mInfo, err := protoregistry.GitalyProtoPreregistered.LookupMethod(tt.method)
require.NoError(t, err)
require.Exactly(t, tt.scope, mInfo.Scope)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 378d98628..51772ede5 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -227,16 +227,14 @@ func TestPropagateReplicationJob(t *testing.T) {
txMgr := transactions.NewManager()
- registry := protoregistry.New()
- require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
- coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry)
+ coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
replmgr := NewReplMgr(logEntry, ds, nodeMgr)
prf := NewServer(
coordinator.StreamDirector,
logEntry,
- registry,
+ protoregistry.GitalyProtoPreregistered,
conf,
)
listener, port := listenAvailPort(t)