diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-10-11 07:15:44 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-10-11 07:15:44 +0300 |
commit | 9924772fc6c40102c5a4d5f87484aa9768e2f87f (patch) | |
tree | 7155d917a382e3fdef87fa0991d4f560ab14b54e | |
parent | 3ea6304583bf1441e57a09ba6b7176305b5822f9 (diff) | |
parent | 7b6de4287ef4cd1bbbdd687e48af620041c8df89 (diff) |
Merge branch 'jc-virtual-storage-name-praefect' into 'master'
Add virtual storage name to praefect config
See merge request gitlab-org/gitaly!1525
-rw-r--r-- | changelogs/unreleased/jc-virtual-storage-name-praefect.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 2 | ||||
-rw-r--r-- | config.praefect.toml.example | 1 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 5 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 5 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 1 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 10 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 25 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 7 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 40 |
11 files changed, 84 insertions, 18 deletions
diff --git a/changelogs/unreleased/jc-virtual-storage-name-praefect.yml b/changelogs/unreleased/jc-virtual-storage-name-praefect.yml new file mode 100644 index 000000000..c640c6c1e --- /dev/null +++ b/changelogs/unreleased/jc-virtual-storage-name-praefect.yml @@ -0,0 +1,5 @@ +--- +title: Add virtual storage name to praefect config +merge_request: 1525 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index e5c434ae0..6e83b214d 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -106,7 +106,7 @@ func run(listeners []net.Listener, conf config.Config) error { var ( // top level server dependencies datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, protoregistry.GitalyProtoFileDescriptors...) + coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr("default", logger, datastore, clientConnections) srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) // signal related diff --git a/config.praefect.toml.example b/config.praefect.toml.example index 74e929f33..f23ab8472 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -1,4 +1,5 @@ # Example Praefect configuration file +virtual_storage_name = "praefect" # # TCP address to listen on listen_addr = "127.0.0.1:2305" diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 750515dbd..f7b27394b 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -158,7 +158,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func backend, cleanup := newMockDownstream(t, backendToken, callbackIncrement) conf := config.Config{ - Auth: auth.Config{Token: token, Transitioning: !required}, + VirtualStorageName: "praefect", + Auth: auth.Config{Token: token, Transitioning: !required}, Nodes: []*models.Node{ &models.Node{ Storage: "praefect-internal-0", @@ -180,7 +181,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-0", backend, backendToken) - coordinator := NewCoordinator(logEntry, datastore, clientConnections, fd) + coordinator := NewCoordinator(logEntry, datastore, clientConnections, conf, fd) replMgr := NewReplMgr("praefect-internal-0", logEntry, datastore, clientConnections) diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 818acae32..89e62a488 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -13,8 +13,9 @@ import ( // Config is a container for everything found in the TOML config file type Config struct { - ListenAddr string `toml:"listen_addr"` - SocketPath string `toml:"socket_path"` + VirtualStorageName string `toml:"virtual_storage_name"` + ListenAddr string `toml:"listen_addr"` + SocketPath string `toml:"socket_path"` Nodes []*models.Node `toml:"node"` diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 22316f563..658effa7e 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -73,6 +73,7 @@ func TestConfigParsing(t *testing.T) { { filePath: "testdata/config.toml", expected: Config{ + VirtualStorageName: "praefect", Nodes: []*models.Node{ &models.Node{ Address: "tcp://gitaly-internal-1.example.com", diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 36476c98f..c7f920e90 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -1,3 +1,4 @@ +virtual_storage_name = "praefect" listen_addr = "" socket_path = "" prometheus_listen_addr = "" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 49a1d7990..57772f68b 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -10,6 +10,7 @@ import ( "syscall" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -20,6 +21,7 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) // Coordinator takes care of directing client requests to the appropriate @@ -33,10 +35,11 @@ type Coordinator struct { datastore Datastore registry *protoregistry.Registry + conf config.Config } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -45,6 +48,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *con datastore: datastore, registry: registry, connections: clientConnections, + conf: conf, } } @@ -118,6 +122,10 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo return "", nil, err } + if targetRepo.StorageName != c.conf.VirtualStorageName { + return "", nil, grpc.Errorf(codes.InvalidArgument, "only messages for %s are allowed", c.conf.VirtualStorageName) + } + primary, err := c.selectPrimary(mi, targetRepo) if err != nil { return "", nil, err diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index bcc9d9132..5418fec53 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -28,7 +28,8 @@ func TestSecondaryRotation(t *testing.T) { } func TestStreamDirector(t *testing.T) { - datastore := NewMemoryDatastore(config.Config{ + conf := config.Config{ + VirtualStorageName: "praefect", Nodes: []*models.Node{ &models.Node{ Address: "tcp://gitaly-primary.example.com", @@ -39,7 +40,8 @@ func TestStreamDirector(t *testing.T) { Address: "tcp://gitaly-backup1.example.com", Storage: "praefect-internal-2", }}, - }) + } + datastore := NewMemoryDatastore(conf) targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -53,7 +55,7 @@ func TestStreamDirector(t *testing.T) { clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token") - coordinator := NewCoordinator(log.Default(), datastore, clientConnections) + coordinator := NewCoordinator(log.Default(), datastore, clientConnections, conf) require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) frame, err := proto.Marshal(&gitalypb.GarbageCollectRequest{ @@ -61,10 +63,23 @@ func TestStreamDirector(t *testing.T) { }) require.NoError(t, err) - _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame}) + fullMethod := "/gitaly.RepositoryService/GarbageCollect" + + peeker := &mockPeeker{frame} + _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, fullMethod, peeker) require.NoError(t, err) require.Equal(t, address, conn.Target()) + mi, err := coordinator.registry.LookupMethod(fullMethod) + require.NoError(t, err) + + m, err := protoMessageFromPeeker(mi, peeker) + require.NoError(t, err) + + rewrittenRepo, err := mi.TargetRepo(m) + require.NoError(t, err) + require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name") + jobs, err := datastore.GetJobs(JobStatePending, 1, 10) require.NoError(t, err) require.Len(t, jobs, 1) @@ -103,5 +118,7 @@ func (m *mockPeeker) Peek() ([]byte, error) { } func (m *mockPeeker) Modify(payload []byte) error { + m.frame = payload + return nil } diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 0ed1b3b5e..d30a13e58 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -13,6 +13,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ( @@ -65,7 +66,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // little bit of gRPC internals never hurt anyone fullMethodName, ok := grpc.MethodFromServerStream(serverStream) if !ok { - return grpc.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") } peeker := newPeeker(serverStream) @@ -106,7 +107,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and // exit with an error to the stack clientCancel() - return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) } case c2sErr := <-c2sErrChan: // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two @@ -120,7 +121,7 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return nil } } - return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") + return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") } func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 88b803f47..6586a8eae 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -23,7 +23,9 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" ) // TestServerSimpleUnaryUnary verifies that the Praefect server is capable of @@ -65,6 +67,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) { ) conf := config.Config{ + VirtualStorageName: "praefect", Nodes: []*models.Node{ &models.Node{ ID: 1, @@ -80,10 +83,9 @@ func TestServerSimpleUnaryUnary(t *testing.T) { } datastore := NewMemoryDatastore(conf) - logEntry := log.Default() - clientCC := conn.NewClientConnections() + coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, fd) for id, nodeStorage := range datastore.storageNodes.m { backend, cleanup := newMockDownstream(t, nodeStorage.Token, tt.callback) @@ -94,8 +96,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) { datastore.storageNodes.m[id] = nodeStorage } - coordinator := NewCoordinator(logEntry, datastore, clientCC, fd) - replmgr := NewReplMgr( storagePrimary, logEntry, @@ -186,6 +186,36 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, err) } +func TestRejectBadStorage(t *testing.T) { + conf := config.Config{ + VirtualStorageName: "praefect", + Nodes: []*models.Node{ + &models.Node{ + DefaultPrimary: true, + Storage: "praefect-internal-0", + Address: "tcp::/this-doesnt-matter", + }, + }, + } + + cc, srv := runFullPraefectServer(t, conf) + defer srv.s.Stop() + + badTargetRepo := gitalypb.Repository{ + StorageName: "default", + RelativePath: "/path/to/hashed/storage", + } + + repoClient := gitalypb.NewRepositoryServiceClient(cc) + + ctx, cancel := testhelper.Context() + defer cancel() + + _, err := repoClient.GarbageCollect(ctx, &gitalypb.GarbageCollectRequest{Repository: &badTargetRepo}) + testhelper.RequireGrpcError(t, err, codes.InvalidArgument) + require.Equal(t, fmt.Sprintf("only messages for %s are allowed", conf.VirtualStorageName), status.Convert(err).Message()) +} + func runFullPraefectServer(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server) { datastore := NewMemoryDatastore(conf) @@ -200,7 +230,7 @@ func runFullPraefectServer(t *testing.T, conf config.Config) (*grpc.ClientConn, datastore.storageNodes.m[id] = nodeStorage } - coordinator := NewCoordinator(logEntry, datastore, clientCC, protoregistry.GitalyProtoFileDescriptors...) + coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) replmgr := NewReplMgr( "", |