From 653b7377e02b789dac3749d7ceefc162d001ec93 Mon Sep 17 00:00:00 2001 From: John Cai Date: Mon, 12 Aug 2019 16:30:44 -0700 Subject: Register nodes by id instead of by storage --- .../unreleased/jc-handle-bad-connections.yml | 5 ++ cmd/praefect/main.go | 11 ++- internal/praefect/coordinator.go | 93 ++++++++++------------ internal/praefect/coordinator_test.go | 2 +- internal/praefect/datastore.go | 55 +++++++++++++ internal/praefect/grpc-proxy/proxy/director.go | 3 + .../praefect/grpc-proxy/proxy/examples_test.go | 7 +- internal/praefect/grpc-proxy/proxy/handler.go | 23 +++--- internal/praefect/grpc-proxy/proxy/handler_test.go | 4 +- internal/praefect/grpc-proxy/proxy/helper_test.go | 4 +- internal/praefect/models/node.go | 1 + internal/praefect/replicator.go | 2 +- internal/praefect/server.go | 6 +- internal/praefect/server_test.go | 2 +- 14 files changed, 141 insertions(+), 77 deletions(-) create mode 100644 changelogs/unreleased/jc-handle-bad-connections.yml diff --git a/changelogs/unreleased/jc-handle-bad-connections.yml b/changelogs/unreleased/jc-handle-bad-connections.yml new file mode 100644 index 000000000..e591ccb58 --- /dev/null +++ b/changelogs/unreleased/jc-handle-bad-connections.yml @@ -0,0 +1,5 @@ +--- +title: Handle bad connections +merge_request: 1451 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index cf7d1b8ab..2afeaa7c4 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -114,8 +114,13 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - for _, node := range conf.Nodes { - if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil { + nodes, err := datastore.GetStorageNodes() + if err != nil { + return fmt.Errorf("failed to get storage nodes from datastore: %v", err) + } + + for _, node := range nodes { + if err := coordinator.RegisterNode(node.ID, node.Address); err != nil { return fmt.Errorf("failed to register %s: %s", node.Address, err) } @@ -124,8 +129,6 @@ func run(listeners []net.Listener, conf config.Config) error { go func() { serverErrors <- repl.ProcessBacklog(ctx) }() - go coordinator.FailoverRotation() - select { case s := <-termCh: logger.WithField("signal", s).Warn("received signal, shutting down gracefully") diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0c80bd398..64fb992e3 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,11 +4,8 @@ import ( "context" "errors" "fmt" - "os" - "os/signal" "sort" "sync" - "syscall" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" @@ -35,7 +32,7 @@ type Coordinator struct { datastore Datastore - nodes map[string]*grpc.ClientConn + nodes map[int]*grpc.ClientConn registry *protoregistry.Registry } @@ -47,7 +44,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*de return &Coordinator{ log: l, datastore: datastore, - nodes: make(map[string]*grpc.ClientConn), + nodes: make(map[int]*grpc.ClientConn), registry: registry, } } @@ -57,15 +54,29 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } +func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error { + c.failoverMutex.Lock() + defer c.failoverMutex.Unlock() + + for storageNodeID, conn := range c.nodes { + if conn == cc { + if err := c.datastore.Failover(storageNodeID); err != nil { + return err + } + } + } + + return nil +} + // streamDirector determines which downstream servers receive requests func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + c.failoverMutex.Lock() + defer c.failoverMutex.Unlock() // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) - c.failoverMutex.RLock() - defer c.failoverMutex.RUnlock() - mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { return nil, nil, nil, err @@ -77,24 +88,24 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, } var requestFinalizer func() - var storage string + var node models.Node if mi.Scope == protoregistry.ScopeRepository { - storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker) + node, requestFinalizer, err = c.getNodeForRepositoryMessage(mi, m, peeker) if err != nil { return nil, nil, nil, err } } else { - storage, requestFinalizer, err = c.getAnyStorageNode() + node, requestFinalizer, err = c.getAnyStorageNode() if err != nil { return nil, nil, nil, err } } // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, err := c.GetConnection(storage) + cc, err := c.GetConnection(node.ID) if err != nil { - return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage) + return nil, nil, nil, fmt.Errorf("unable to find existing client connection for node_id %d", node.ID) } return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil @@ -102,51 +113,51 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var noopRequestFinalizer = func() {} -func (c *Coordinator) getAnyStorageNode() (string, func(), error) { +func (c *Coordinator) getAnyStorageNode() (models.Node, func(), error) { //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to // proxy requests that are not repository scoped node, err := c.datastore.GetStorageNodes() if err != nil { - return "", nil, err + return models.Node{}, nil, err } if len(node) == 0 { - return "", nil, errors.New("no node storages found") + return models.Node{}, nil, errors.New("no node storages found") } - return node[0].Storage, noopRequestFinalizer, nil + return node[0], noopRequestFinalizer, nil } -func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) { +func (c *Coordinator) getNodeForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (models.Node, func(), error) { targetRepo, err := mi.TargetRepo(m) if err != nil { - return "", nil, err + return models.Node{}, nil, err } primary, err := c.selectPrimary(mi, targetRepo) if err != nil { - return "", nil, err + return models.Node{}, nil, err } targetRepo.StorageName = primary.Storage b, err := proxy.Codec().Marshal(m) if err != nil { - return "", nil, err + return models.Node{}, nil, err } if err = peeker.Modify(b); err != nil { - return "", nil, err + return models.Node{}, nil, err } requestFinalizer := noopRequestFinalizer if mi.Operation == protoregistry.OpMutator { if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil { - return "", nil, err + return models.Node{}, nil, err } } - return primary.Storage, requestFinalizer, nil + return *primary, requestFinalizer, nil } func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) { @@ -202,6 +213,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi } m, err := mi.UnmarshalRequestProto(frame) + if err != nil { return nil, err } @@ -225,7 +237,7 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func() // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { +func (c *Coordinator) RegisterNode(nodeID int, listenAddr string) error { conn, err := client.Dial(listenAddr, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), @@ -236,21 +248,23 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { return err } - c.setConn(storageName, conn) + c.setConn(nodeID, conn) + + c.datastore.UpdateStorageNode(nodeID, true) return nil } -func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(nodeID int, conn *grpc.ClientConn) { c.connMutex.Lock() - c.nodes[storageName] = conn + c.nodes[nodeID] = conn c.connMutex.Unlock() } // GetConnection gets the grpc client connection based on an address -func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) { +func (c *Coordinator) GetConnection(nodeID int) (*grpc.ClientConn, error) { c.connMutex.RLock() - cc, ok := c.nodes[storageName] + cc, ok := c.nodes[nodeID] c.connMutex.RUnlock() if !ok { return nil, errors.New("client connection not found") @@ -259,22 +273,3 @@ func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error return cc, nil } - -// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary -func (c *Coordinator) FailoverRotation() { - c.handleSignalAndRotate() -} - -func (c *Coordinator) handleSignalAndRotate() { - failoverChan := make(chan os.Signal, 1) - signal.Notify(failoverChan, syscall.SIGUSR1) - - for { - <-failoverChan - - c.failoverMutex.Lock() - // TODO: update failover logic - c.log.Info("failover happens") - c.failoverMutex.Unlock() - } -} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index e029a3f48..f9cdcf2bc 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -57,7 +57,7 @@ func TestStreamDirector(t *testing.T) { cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure()) require.NoError(t, err) - coordinator.setConn("praefect-internal-1", cc) + coordinator.setConn(0, cc) _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame}) require.NoError(t, err) diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index f61a64064..2bb5f60fd 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -75,6 +75,8 @@ type ReplicasDatastore interface { GetStorageNodes() ([]models.Node, error) + GetHealthyStorageNodes() ([]models.Node, error) + GetPrimary(relativePath string) (*models.Node, error) SetPrimary(relativePath string, storageNodeID int) error @@ -84,6 +86,10 @@ type ReplicasDatastore interface { RemoveReplica(relativePath string, storageNodeID int) error GetRepository(relativePath string) (*models.Repository, error) + + UpdateStorageNode(storageNodeID int, healthy bool) error + + Failover(storageNodeID int) error } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -201,6 +207,21 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) { return storageNodes, nil } +// GetHealthyStorageNodes gets all storage nodes +func (md *MemoryDatastore) GetHealthyStorageNodes() ([]models.Node, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + + var storageNodes []models.Node + for _, storageNode := range md.storageNodes.m { + if storageNode.Healthy { + storageNodes = append(storageNodes, storageNode) + } + } + + return storageNodes, nil +} + // GetPrimary gets the primary storage node for a repository of a repository relative path func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.Node, error) { md.repositories.RLock() @@ -413,3 +434,37 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } + +// Failover replaces any repository with storage as its primary with one of its replicas +func (md *MemoryDatastore) Failover(storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + md.UpdateStorageNode(storageNodeID, true) + + for relativePath, repository := range md.repositories.m { + if repository.Primary.ID == storageNodeID { + if len(repository.Replicas) > 0 { + repository.Primary = repository.Replicas[0] + repository.Replicas = repository.Replicas[1:] + md.repositories.m[relativePath] = repository + } + } + } + return nil +} + +// UpdateStorageNode updates the health of the given storage node by its id +func (md *MemoryDatastore) UpdateStorageNode(nodeID int, healthy bool) error { + md.storageNodes.Lock() + defer md.storageNodes.Unlock() + + storageNode, ok := md.storageNodes.m[nodeID] + if !ok { + return errors.New("node not found") + } + storageNode.Healthy = healthy + md.storageNodes.m[nodeID] = storageNode + + return nil +} diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 82712765f..98e8e7313 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -22,3 +22,6 @@ import ( // // See the rather rich example. type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) + +// ConnectionDownNotifier takes care of updating the datastore so that other requests stop using the downed connection +type ConnectionDownNotifier func(cc *grpc.ClientConn) error diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go index 6d4a3238e..ccc24b23e 100644 --- a/internal/praefect/grpc-proxy/proxy/examples_test.go +++ b/internal/praefect/grpc-proxy/proxy/examples_test.go @@ -18,14 +18,15 @@ import ( ) var ( - director proxy.StreamDirector + director proxy.StreamDirector + connDownNotifier proxy.ConnectionDownNotifier ) func ExampleRegisterService() { // A gRPC server with the proxying codec enabled. server := grpc.NewServer(grpc.CustomCodec(proxy.Codec())) // Register a TestService with 4 of its methods explicitly. - proxy.RegisterService(server, director, + proxy.RegisterService(server, director, connDownNotifier, "mwitkow.testproto.TestService", "PingEmpty", "Ping", "PingError", "PingList") } @@ -33,7 +34,7 @@ func ExampleRegisterService() { func ExampleTransparentHandler() { grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownNotifier))) } // Provide sa simple example of a director that shields internal services and dials a staging or production backend. diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 0ed1b3b5e..2fa11660e 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -26,8 +26,8 @@ var ( // The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director} +func RegisterService(server *grpc.Server, director StreamDirector, connDownNotifier ConnectionDownNotifier, serviceName string, methodNames ...string) { + streamer := &handler{director, connDownNotifier} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -49,13 +49,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s // backends. It should be used as a `grpc.UnknownServiceHandler`. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director} +func TransparentHandler(director StreamDirector, connectionDownNotifier ConnectionDownNotifier) grpc.StreamHandler { + streamer := &handler{director, connectionDownNotifier} return streamer.handler } type handler struct { - director StreamDirector + director StreamDirector + connDownNotifier ConnectionDownNotifier } // handler is where the real magic of proxying happens. @@ -76,18 +77,18 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return err } - defer func() { - if requestFinalizer != nil { - requestFinalizer() - } - }() - clientCtx, clientCancel := context.WithCancel(outgoingCtx) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) if err != nil { return err } + + defer func() { + if requestFinalizer != nil { + requestFinalizer() + } + }() // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. // Channels do not have to be closed, it is just a control flow mechanism, see // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go index c57837d2a..e91ab4a1f 100644 --- a/internal/praefect/grpc-proxy/proxy/handler_test.go +++ b/internal/praefect/grpc-proxy/proxy/handler_test.go @@ -221,10 +221,10 @@ func (s *ProxyHappySuite) SetupSuite() { } s.proxy = grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownNotifier)), ) // Ping handler is handled as an explicit registration and not as a TransparentHandler. - proxy.RegisterService(s.proxy, director, + proxy.RegisterService(s.proxy, director, connDownNotifier, "mwitkow.testproto.TestService", "Ping") diff --git a/internal/praefect/grpc-proxy/proxy/helper_test.go b/internal/praefect/grpc-proxy/proxy/helper_test.go index 615d5a3fd..befb49000 100644 --- a/internal/praefect/grpc-proxy/proxy/helper_test.go +++ b/internal/praefect/grpc-proxy/proxy/helper_test.go @@ -56,10 +56,10 @@ func newBackendPinger(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *in func newProxy(tb testing.TB, ctx context.Context, director proxy.StreamDirector, svc, method string) (*grpc.ClientConn, func()) { proxySrvr := grpc.NewServer( grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownNotifier)), ) - proxy.RegisterService(proxySrvr, director, svc, method) + proxy.RegisterService(proxySrvr, director, connDownNotifier, svc, method) done := make(chan struct{}) listener := newListener(tb) diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go index 941a72e8f..1f7c730c3 100644 --- a/internal/praefect/models/node.go +++ b/internal/praefect/models/node.go @@ -6,6 +6,7 @@ type Node struct { Storage string `toml:"storage"` Address string `toml:"address"` Token string `toml:"token"` + Healthy bool } // Repository describes a repository's relative path and its primary and list of secondaries diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index f54a6f457..ff8139211 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -178,7 +178,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { return err } - cc, err := r.coordinator.GetConnection(job.TargetNode.Storage) + cc, err := r.coordinator.GetConnection(job.TargetNode.ID) if err != nil { return err } diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 3a7c0fabe..64974e9a1 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -28,7 +28,7 @@ type Server struct { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry) *Server { - grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) + grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector, c.connDownHandler)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler @@ -58,10 +58,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo } } -func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { +func proxyRequiredOpts(director proxy.StreamDirector, connDownHandler proxy.ConnectionDownNotifier) []grpc.ServerOption { return []grpc.ServerOption{ grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownHandler)), } } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index d04f61121..3227c0b22 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -76,7 +76,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) { backend, cleanup := newMockDownstream(t, tt.callback) defer cleanup() // clean up mock downstream server resources - coordinator.RegisterNode(nodeStorage.Storage, backend) + coordinator.RegisterNode(nodeStorage.ID, backend) nodeStorage.Address = backend datastore.storageNodes.m[id] = nodeStorage } -- cgit v1.2.3