diff options
author | John Cai <jcai@gitlab.com> | 2019-08-13 02:30:44 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-08-23 21:04:01 +0300 |
commit | a05503a3f6bd3849d90e099fe658ad8f85fa0d16 (patch) | |
tree | c2240f62ac7f433207e9aa37dc7508b03f9dc334 | |
parent | c06f35897d1e4c38535706d7965db399eafcd736 (diff) |
Lazy failoverjc-lazy-failover
-rw-r--r-- | changelogs/unreleased/jc-lazy-failover.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 11 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 93 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 54 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/director.go | 2 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 43 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/peeker.go | 12 | ||||
-rw-r--r-- | internal/praefect/models/node.go | 1 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 2 | ||||
-rw-r--r-- | internal/praefect/server.go | 6 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 2 |
12 files changed, 161 insertions, 72 deletions
diff --git a/changelogs/unreleased/jc-lazy-failover.yml b/changelogs/unreleased/jc-lazy-failover.yml new file mode 100644 index 000000000..fa482a51d --- /dev/null +++ b/changelogs/unreleased/jc-lazy-failover.yml @@ -0,0 +1,5 @@ +--- +title: Lazy failover +merge_request: 1450 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index cf7d1b8ab..2afeaa7c4 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -114,8 +114,13 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - for _, node := range conf.Nodes { - if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil { + nodes, err := datastore.GetStorageNodes() + if err != nil { + return fmt.Errorf("failed to get storage nodes from datastore: %v", err) + } + + for _, node := range nodes { + if err := coordinator.RegisterNode(node.ID, node.Address); err != nil { return fmt.Errorf("failed to register %s: %s", node.Address, err) } @@ -124,8 +129,6 @@ func run(listeners []net.Listener, conf config.Config) error { go func() { serverErrors <- repl.ProcessBacklog(ctx) }() - go coordinator.FailoverRotation() - select { case s := <-termCh: logger.WithField("signal", s).Warn("received signal, shutting down gracefully") diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0c80bd398..64fb992e3 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,11 +4,8 @@ import ( "context" "errors" "fmt" - "os" - "os/signal" "sort" "sync" - "syscall" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" @@ -35,7 +32,7 @@ type Coordinator struct { datastore Datastore - nodes map[string]*grpc.ClientConn + nodes map[int]*grpc.ClientConn registry *protoregistry.Registry } @@ -47,7 +44,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*de return &Coordinator{ log: l, datastore: datastore, - nodes: make(map[string]*grpc.ClientConn), + nodes: make(map[int]*grpc.ClientConn), registry: registry, } } @@ -57,15 +54,29 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } +func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error { + c.failoverMutex.Lock() + defer c.failoverMutex.Unlock() + + for storageNodeID, conn := range c.nodes { + if conn == cc { + if err := c.datastore.Failover(storageNodeID); err != nil { + return err + } + } + } + + return nil +} + // streamDirector determines which downstream servers receive requests func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + c.failoverMutex.Lock() + defer c.failoverMutex.Unlock() // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) - c.failoverMutex.RLock() - defer c.failoverMutex.RUnlock() - mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { return nil, nil, nil, err @@ -77,24 +88,24 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, } var requestFinalizer func() - var storage string + var node models.Node if mi.Scope == protoregistry.ScopeRepository { - storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker) + node, requestFinalizer, err = c.getNodeForRepositoryMessage(mi, m, peeker) if err != nil { return nil, nil, nil, err } } else { - storage, requestFinalizer, err = c.getAnyStorageNode() + node, requestFinalizer, err = c.getAnyStorageNode() if err != nil { return nil, nil, nil, err } } // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, err := c.GetConnection(storage) + cc, err := c.GetConnection(node.ID) if err != nil { - return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage) + return nil, nil, nil, fmt.Errorf("unable to find existing client connection for node_id %d", node.ID) } return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil @@ -102,51 +113,51 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var noopRequestFinalizer = func() {} -func (c *Coordinator) getAnyStorageNode() (string, func(), error) { +func (c *Coordinator) getAnyStorageNode() (models.Node, func(), error) { //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to // proxy requests that are not repository scoped node, err := c.datastore.GetStorageNodes() if err != nil { - return "", nil, err + return models.Node{}, nil, err } if len(node) == 0 { - return "", nil, errors.New("no node storages found") + return models.Node{}, nil, errors.New("no node storages found") } - return node[0].Storage, noopRequestFinalizer, nil + return node[0], noopRequestFinalizer, nil } -func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) { +func (c *Coordinator) getNodeForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (models.Node, func(), error) { targetRepo, err := mi.TargetRepo(m) if err != nil { - return "", nil, err + return models.Node{}, nil, err } primary, err := c.selectPrimary(mi, targetRepo) if err != nil { - return "", nil, err + return models.Node{}, nil, err } targetRepo.StorageName = primary.Storage b, err := proxy.Codec().Marshal(m) if err != nil { - return "", nil, err + return models.Node{}, nil, err } if err = peeker.Modify(b); err != nil { - return "", nil, err + return models.Node{}, nil, err } requestFinalizer := noopRequestFinalizer if mi.Operation == protoregistry.OpMutator { if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil { - return "", nil, err + return models.Node{}, nil, err } } - return primary.Storage, requestFinalizer, nil + return *primary, requestFinalizer, nil } func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) { @@ -202,6 +213,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi } m, err := mi.UnmarshalRequestProto(frame) + if err != nil { return nil, err } @@ -225,7 +237,7 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func() // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { +func (c *Coordinator) RegisterNode(nodeID int, listenAddr string) error { conn, err := client.Dial(listenAddr, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), @@ -236,21 +248,23 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { return err } - c.setConn(storageName, conn) + c.setConn(nodeID, conn) + + c.datastore.UpdateStorageNode(nodeID, true) return nil } -func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(nodeID int, conn *grpc.ClientConn) { c.connMutex.Lock() - c.nodes[storageName] = conn + c.nodes[nodeID] = conn c.connMutex.Unlock() } // GetConnection gets the grpc client connection based on an address -func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) { +func (c *Coordinator) GetConnection(nodeID int) (*grpc.ClientConn, error) { c.connMutex.RLock() - cc, ok := c.nodes[storageName] + cc, ok := c.nodes[nodeID] c.connMutex.RUnlock() if !ok { return nil, errors.New("client connection not found") @@ -259,22 +273,3 @@ func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error return cc, nil } - -// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary -func (c *Coordinator) FailoverRotation() { - c.handleSignalAndRotate() -} - -func (c *Coordinator) handleSignalAndRotate() { - failoverChan := make(chan os.Signal, 1) - signal.Notify(failoverChan, syscall.SIGUSR1) - - for { - <-failoverChan - - c.failoverMutex.Lock() - // TODO: update failover logic - c.log.Info("failover happens") - c.failoverMutex.Unlock() - } -} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index e029a3f48..f9cdcf2bc 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -57,7 +57,7 @@ func TestStreamDirector(t *testing.T) { cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure()) require.NoError(t, err) - coordinator.setConn("praefect-internal-1", cc) + coordinator.setConn(0, cc) _, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame}) require.NoError(t, err) diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index f61a64064..6171926ed 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -75,6 +75,8 @@ type ReplicasDatastore interface { GetStorageNodes() ([]models.Node, error) + GetHealthyStorageNodes() ([]models.Node, error) + GetPrimary(relativePath string) (*models.Node, error) SetPrimary(relativePath string, storageNodeID int) error @@ -84,6 +86,10 @@ type ReplicasDatastore interface { RemoveReplica(relativePath string, storageNodeID int) error GetRepository(relativePath string) (*models.Repository, error) + + UpdateStorageNode(storageNodeID int, healthy bool) error + + Failover(storageNodeID int) error } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -201,6 +207,21 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) { return storageNodes, nil } +// GetStorageHealthyNodes gets all storage nodes +func (md *MemoryDatastore) GetHealthyStorageNodes() ([]models.Node, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + + var storageNodes []models.Node + for _, storageNode := range md.storageNodes.m { + if storageNode.Healthy { + storageNodes = append(storageNodes, storageNode) + } + } + + return storageNodes, nil +} + // GetPrimary gets the primary storage node for a repository of a repository relative path func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.Node, error) { md.repositories.RLock() @@ -413,3 +434,36 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } + +// Failover replaces any repository with storage as its primary with one of its replicas +func (md *MemoryDatastore) Failover(storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + md.UpdateStorageNode(storageNodeID, true) + + for relativePath, repository := range md.repositories.m { + if repository.Primary.ID == storageNodeID { + if len(repository.Replicas) > 0 { + repository.Primary = repository.Replicas[0] + repository.Replicas = repository.Replicas[1:] + md.repositories.m[relativePath] = repository + } + } + } + return nil +} + +func (md *MemoryDatastore) UpdateStorageNode(nodeID int, healthy bool) error { + md.storageNodes.Lock() + defer md.storageNodes.Unlock() + + storageNode, ok := md.storageNodes.m[nodeID] + if !ok { + return errors.New("node not found") + } + storageNode.Healthy = healthy + md.storageNodes.m[nodeID] = storageNode + + return nil +} diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 82712765f..bfbacaac8 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -22,3 +22,5 @@ import ( // // See the rather rich example. type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) + +type ConnectionDownNotifier func(cc *grpc.ClientConn) error diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index 0ed1b3b5e..40d099da4 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -10,9 +10,11 @@ package proxy import ( "io" + "gitlab.com/gitlab-org/gitaly/internal/helper" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ( @@ -26,8 +28,8 @@ var ( // The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director} +func RegisterService(server *grpc.Server, director StreamDirector, connDownNotifier ConnectionDownNotifier, serviceName string, methodNames ...string) { + streamer := &handler{director, connDownNotifier} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -49,13 +51,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s // backends. It should be used as a `grpc.UnknownServiceHandler`. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director} +func TransparentHandler(director StreamDirector, connectionDownNotifier ConnectionDownNotifier) grpc.StreamHandler { + streamer := &handler{director, connectionDownNotifier} return streamer.handler } type handler struct { - director StreamDirector + director StreamDirector + connDownNotifier ConnectionDownNotifier } // handler is where the real magic of proxying happens. @@ -76,18 +79,34 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error return err } - defer func() { - if requestFinalizer != nil { - requestFinalizer() - } - }() - clientCtx, clientCancel := context.WithCancel(outgoingCtx) // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) if err != nil { - return err + status, ok := status.FromError(err) + if ok { + if status.Code() == codes.Unavailable { + s.connDownNotifier(backendConn) + + // try again with a new backend connection + _, backendConn, requestFinalizer, err = s.director(serverStream.Context(), fullMethodName, peeker) + if err != nil { + return err + } + + clientStream, err = grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) + if err != nil { + return helper.DecorateError(codes.Internal, err) + } + } + } } + + defer func() { + if requestFinalizer != nil { + requestFinalizer() + } + }() // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. // Channels do not have to be closed, it is just a control flow mechanism, see // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go index 1d1e02df5..e5516d4a6 100644 --- a/internal/praefect/grpc-proxy/proxy/peeker.go +++ b/internal/praefect/grpc-proxy/proxy/peeker.go @@ -64,9 +64,19 @@ func (p peeker) peek(n uint) ([][]byte, error) { return nil, ErrInvalidPeekCount } - p.consumedStream.frames = make([]*frame, n) peekedFrames := make([][]byte, n) + // if there are already consumed frames, read those + if p.consumedStream.frames != nil && len(p.consumedStream.frames) >= int(n) { + for i := 0; i < int(n); i++ { + peekedFrames[i] = p.consumedStream.frames[i].payload + } + + return peekedFrames, nil + } + + p.consumedStream.frames = make([]*frame, n) + for i := 0; i < len(p.consumedStream.frames); i++ { f := &frame{} if err := p.srcStream.RecvMsg(f); err != nil { diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go index 941a72e8f..1f7c730c3 100644 --- a/internal/praefect/models/node.go +++ b/internal/praefect/models/node.go @@ -6,6 +6,7 @@ type Node struct { Storage string `toml:"storage"` Address string `toml:"address"` Token string `toml:"token"` + Healthy bool } // Repository describes a repository's relative path and its primary and list of secondaries diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index f54a6f457..ff8139211 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -178,7 +178,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { return err } - cc, err := r.coordinator.GetConnection(job.TargetNode.Storage) + cc, err := r.coordinator.GetConnection(job.TargetNode.ID) if err != nil { return err } diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 3a7c0fabe..64974e9a1 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -28,7 +28,7 @@ type Server struct { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry) *Server { - grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) + grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector, c.connDownHandler)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler @@ -58,10 +58,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo } } -func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { +func proxyRequiredOpts(director proxy.StreamDirector, connDownHandler proxy.ConnectionDownNotifier) []grpc.ServerOption { return []grpc.ServerOption{ grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownHandler)), } } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index d04f61121..3227c0b22 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -76,7 +76,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) { backend, cleanup := newMockDownstream(t, tt.callback) defer cleanup() // clean up mock downstream server resources - coordinator.RegisterNode(nodeStorage.Storage, backend) + coordinator.RegisterNode(nodeStorage.ID, backend) nodeStorage.Address = backend datastore.storageNodes.m[id] = nodeStorage } |