Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-10-11 07:15:44 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-10-11 07:15:44 +0300
commit9924772fc6c40102c5a4d5f87484aa9768e2f87f (patch)
tree7155d917a382e3fdef87fa0991d4f560ab14b54e
parent3ea6304583bf1441e57a09ba6b7176305b5822f9 (diff)
parent7b6de4287ef4cd1bbbdd687e48af620041c8df89 (diff)
Merge branch 'jc-virtual-storage-name-praefect' into 'master'
Add virtual storage name to praefect config See merge request gitlab-org/gitaly!1525
-rw-r--r--changelogs/unreleased/jc-virtual-storage-name-praefect.yml5
-rw-r--r--cmd/praefect/main.go2
-rw-r--r--config.praefect.toml.example1
-rw-r--r--internal/praefect/auth_test.go5
-rw-r--r--internal/praefect/config/config.go5
-rw-r--r--internal/praefect/config/config_test.go1
-rw-r--r--internal/praefect/config/testdata/config.toml1
-rw-r--r--internal/praefect/coordinator.go10
-rw-r--r--internal/praefect/coordinator_test.go25
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go7
-rw-r--r--internal/praefect/server_test.go40
11 files changed, 84 insertions, 18 deletions
diff --git a/changelogs/unreleased/jc-virtual-storage-name-praefect.yml b/changelogs/unreleased/jc-virtual-storage-name-praefect.yml
new file mode 100644
index 000000000..c640c6c1e
--- /dev/null
+++ b/changelogs/unreleased/jc-virtual-storage-name-praefect.yml
@@ -0,0 +1,5 @@
+---
+title: Add virtual storage name to praefect config
+merge_request: 1525
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index e5c434ae0..6e83b214d 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -106,7 +106,7 @@ func run(listeners []net.Listener, conf config.Config) error {
var (
// top level server dependencies
datastore = praefect.NewMemoryDatastore(conf)
- coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, protoregistry.GitalyProtoFileDescriptors...)
+ coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr("default", logger, datastore, clientConnections)
srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
// signal related
diff --git a/config.praefect.toml.example b/config.praefect.toml.example
index 74e929f33..f23ab8472 100644
--- a/config.praefect.toml.example
+++ b/config.praefect.toml.example
@@ -1,4 +1,5 @@
# Example Praefect configuration file
+virtual_storage_name = "praefect"
# # TCP address to listen on
listen_addr = "127.0.0.1:2305"
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 750515dbd..f7b27394b 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -158,7 +158,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
backend, cleanup := newMockDownstream(t, backendToken, callbackIncrement)
conf := config.Config{
- Auth: auth.Config{Token: token, Transitioning: !required},
+ VirtualStorageName: "praefect",
+ Auth: auth.Config{Token: token, Transitioning: !required},
Nodes: []*models.Node{
&models.Node{
Storage: "praefect-internal-0",
@@ -180,7 +181,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
clientConnections := conn.NewClientConnections()
clientConnections.RegisterNode("praefect-internal-0", backend, backendToken)
- coordinator := NewCoordinator(logEntry, datastore, clientConnections, fd)
+ coordinator := NewCoordinator(logEntry, datastore, clientConnections, conf, fd)
replMgr := NewReplMgr("praefect-internal-0", logEntry, datastore, clientConnections)
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 818acae32..89e62a488 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -13,8 +13,9 @@ import (
// Config is a container for everything found in the TOML config file
type Config struct {
- ListenAddr string `toml:"listen_addr"`
- SocketPath string `toml:"socket_path"`
+ VirtualStorageName string `toml:"virtual_storage_name"`
+ ListenAddr string `toml:"listen_addr"`
+ SocketPath string `toml:"socket_path"`
Nodes []*models.Node `toml:"node"`
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 22316f563..658effa7e 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -73,6 +73,7 @@ func TestConfigParsing(t *testing.T) {
{
filePath: "testdata/config.toml",
expected: Config{
+ VirtualStorageName: "praefect",
Nodes: []*models.Node{
&models.Node{
Address: "tcp://gitaly-internal-1.example.com",
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 36476c98f..c7f920e90 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -1,3 +1,4 @@
+virtual_storage_name = "praefect"
listen_addr = ""
socket_path = ""
prometheus_listen_addr = ""
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 49a1d7990..57772f68b 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -10,6 +10,7 @@ import (
"syscall"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -20,6 +21,7 @@ import (
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
)
// Coordinator takes care of directing client requests to the appropriate
@@ -33,10 +35,11 @@ type Coordinator struct {
datastore Datastore
registry *protoregistry.Registry
+ conf config.Config
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -45,6 +48,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *con
datastore: datastore,
registry: registry,
connections: clientConnections,
+ conf: conf,
}
}
@@ -118,6 +122,10 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
return "", nil, err
}
+ if targetRepo.StorageName != c.conf.VirtualStorageName {
+ return "", nil, grpc.Errorf(codes.InvalidArgument, "only messages for %s are allowed", c.conf.VirtualStorageName)
+ }
+
primary, err := c.selectPrimary(mi, targetRepo)
if err != nil {
return "", nil, err
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index bcc9d9132..5418fec53 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -28,7 +28,8 @@ func TestSecondaryRotation(t *testing.T) {
}
func TestStreamDirector(t *testing.T) {
- datastore := NewMemoryDatastore(config.Config{
+ conf := config.Config{
+ VirtualStorageName: "praefect",
Nodes: []*models.Node{
&models.Node{
Address: "tcp://gitaly-primary.example.com",
@@ -39,7 +40,8 @@ func TestStreamDirector(t *testing.T) {
Address: "tcp://gitaly-backup1.example.com",
Storage: "praefect-internal-2",
}},
- })
+ }
+ datastore := NewMemoryDatastore(conf)
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -53,7 +55,7 @@ func TestStreamDirector(t *testing.T) {
clientConnections := conn.NewClientConnections()
clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token")
- coordinator := NewCoordinator(log.Default(), datastore, clientConnections)
+ coordinator := NewCoordinator(log.Default(), datastore, clientConnections, conf)
require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{
@@ -61,10 +63,23 @@ func TestStreamDirector(t *testing.T) {
})
require.NoError(t, err)
- _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame})
+ fullMethod := "/gitaly.RepositoryService/GarbageCollect"
+
+ peeker := &mockPeeker{frame}
+ _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, fullMethod, peeker)
require.NoError(t, err)
require.Equal(t, address, conn.Target())
+ mi, err := coordinator.registry.LookupMethod(fullMethod)
+ require.NoError(t, err)
+
+ m, err := protoMessageFromPeeker(mi, peeker)
+ require.NoError(t, err)
+
+ rewrittenRepo, err := mi.TargetRepo(m)
+ require.NoError(t, err)
+ require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name")
+
jobs, err := datastore.GetJobs(JobStatePending, 1, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
@@ -103,5 +118,7 @@ func (m *mockPeeker) Peek() ([]byte, error) {
}
func (m *mockPeeker) Modify(payload []byte) error {
+ m.frame = payload
+
return nil
}
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index 0ed1b3b5e..d30a13e58 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -13,6 +13,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
var (
@@ -65,7 +66,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
// little bit of gRPC internals never hurt anyone
fullMethodName, ok := grpc.MethodFromServerStream(serverStream)
if !ok {
- return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
+ return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context")
}
peeker := newPeeker(serverStream)
@@ -106,7 +107,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
// exit with an error to the stack
clientCancel()
- return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
+ return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
}
case c2sErr := <-c2sErrChan:
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
@@ -120,7 +121,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return nil
}
}
- return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
+ return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
}
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 88b803f47..6586a8eae 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -23,7 +23,9 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
+ "google.golang.org/grpc/status"
)
// TestServerSimpleUnaryUnary verifies that the Praefect server is capable of
@@ -65,6 +67,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
)
conf := config.Config{
+ VirtualStorageName: "praefect",
Nodes: []*models.Node{
&models.Node{
ID: 1,
@@ -80,10 +83,9 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
}
datastore := NewMemoryDatastore(conf)
-
logEntry := log.Default()
-
clientCC := conn.NewClientConnections()
+ coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, fd)
for id, nodeStorage := range datastore.storageNodes.m {
backend, cleanup := newMockDownstream(t, nodeStorage.Token, tt.callback)
@@ -94,8 +96,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
datastore.storageNodes.m[id] = nodeStorage
}
- coordinator := NewCoordinator(logEntry, datastore, clientCC, fd)
-
replmgr := NewReplMgr(
storagePrimary,
logEntry,
@@ -186,6 +186,36 @@ func TestHealthCheck(t *testing.T) {
require.NoError(t, err)
}
+func TestRejectBadStorage(t *testing.T) {
+ conf := config.Config{
+ VirtualStorageName: "praefect",
+ Nodes: []*models.Node{
+ &models.Node{
+ DefaultPrimary: true,
+ Storage: "praefect-internal-0",
+ Address: "tcp::/this-doesnt-matter",
+ },
+ },
+ }
+
+ cc, srv := runFullPraefectServer(t, conf)
+ defer srv.s.Stop()
+
+ badTargetRepo := gitalypb.Repository{
+ StorageName: "default",
+ RelativePath: "/path/to/hashed/storage",
+ }
+
+ repoClient := gitalypb.NewRepositoryServiceClient(cc)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ _, err := repoClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{Repository: &badTargetRepo})
+ testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
+ require.Equal(t, fmt.Sprintf("only messages for %s are allowed", conf.VirtualStorageName), status.Convert(err).Message())
+}
+
func runFullPraefectServer(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server) {
datastore := NewMemoryDatastore(conf)
@@ -200,7 +230,7 @@ func runFullPraefectServer(t *testing.T, conf config.Config) (*grpc.ClientConn,
datastore.storageNodes.m[id] = nodeStorage
}
- coordinator := NewCoordinator(logEntry, datastore, clientCC, protoregistry.GitalyProtoFileDescriptors...)
+ coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...)
replmgr := NewReplMgr(
"",