diff options
Diffstat (limited to 'internal/praefect/coordinator.go')
-rw-r--r-- | internal/praefect/coordinator.go | 93 |
1 files changed, 44 insertions, 49 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0c80bd398..64fb992e3 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -4,11 +4,8 @@ import ( "context" "errors" "fmt" - "os" - "os/signal" "sort" "sync" - "syscall" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" @@ -35,7 +32,7 @@ type Coordinator struct { datastore Datastore - nodes map[string]*grpc.ClientConn + nodes map[int]*grpc.ClientConn registry *protoregistry.Registry } @@ -47,7 +44,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*de return &Coordinator{ log: l, datastore: datastore, - nodes: make(map[string]*grpc.ClientConn), + nodes: make(map[int]*grpc.ClientConn), registry: registry, } } @@ -57,15 +54,29 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } +func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error { + c.failoverMutex.Lock() + defer c.failoverMutex.Unlock() + + for storageNodeID, conn := range c.nodes { + if conn == cc { + if err := c.datastore.Failover(storageNodeID); err != nil { + return err + } + } + } + + return nil +} + // streamDirector determines which downstream servers receive requests func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + c.failoverMutex.Lock() + defer c.failoverMutex.Unlock() // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) - c.failoverMutex.RLock() - defer c.failoverMutex.RUnlock() - mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { return nil, nil, nil, err @@ -77,24 +88,24 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, } var requestFinalizer func() - var storage string + var node models.Node if mi.Scope == protoregistry.ScopeRepository { - storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker) + node, requestFinalizer, err = c.getNodeForRepositoryMessage(mi, m, peeker) if err != nil { return nil, nil, nil, err } } else { - storage, requestFinalizer, err = c.getAnyStorageNode() + node, requestFinalizer, err = c.getAnyStorageNode() if err != nil { return nil, nil, nil, err } } // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, err := c.GetConnection(storage) + cc, err := c.GetConnection(node.ID) if err != nil { - return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage) + return nil, nil, nil, fmt.Errorf("unable to find existing client connection for node_id %d", node.ID) } return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil @@ -102,51 +113,51 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var noopRequestFinalizer = func() {} -func (c *Coordinator) getAnyStorageNode() (string, func(), error) { +func (c *Coordinator) getAnyStorageNode() (models.Node, func(), error) { //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to // proxy requests that are not repository scoped node, err := c.datastore.GetStorageNodes() if err != nil { - return "", nil, err + return models.Node{}, nil, err } if len(node) == 0 { - return "", nil, errors.New("no node storages found") + return models.Node{}, nil, errors.New("no node storages found") } - return node[0].Storage, noopRequestFinalizer, nil + return node[0], noopRequestFinalizer, nil } -func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) { +func (c *Coordinator) getNodeForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (models.Node, func(), error) { targetRepo, err := mi.TargetRepo(m) if err != nil { - return "", nil, err + return models.Node{}, nil, err } primary, err := c.selectPrimary(mi, targetRepo) if err != nil { - return "", nil, err + return models.Node{}, nil, err } targetRepo.StorageName = primary.Storage b, err := proxy.Codec().Marshal(m) if err != nil { - return "", nil, err + return models.Node{}, nil, err } if err = peeker.Modify(b); err != nil { - return "", nil, err + return models.Node{}, nil, err } requestFinalizer := noopRequestFinalizer if mi.Operation == protoregistry.OpMutator { if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil { - return "", nil, err + return models.Node{}, nil, err } } - return primary.Storage, requestFinalizer, nil + return *primary, requestFinalizer, nil } func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) { @@ -202,6 +213,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi } m, err := mi.UnmarshalRequestProto(frame) + if err != nil { return nil, err } @@ -225,7 +237,7 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func() // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { +func (c *Coordinator) RegisterNode(nodeID int, listenAddr string) error { conn, err := client.Dial(listenAddr, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), @@ -236,21 +248,23 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { return err } - c.setConn(storageName, conn) + c.setConn(nodeID, conn) + + c.datastore.UpdateStorageNode(nodeID, true) return nil } -func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(nodeID int, conn *grpc.ClientConn) { c.connMutex.Lock() - c.nodes[storageName] = conn + c.nodes[nodeID] = conn c.connMutex.Unlock() } // GetConnection gets the grpc client connection based on an address -func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) { +func (c *Coordinator) GetConnection(nodeID int) (*grpc.ClientConn, error) { c.connMutex.RLock() - cc, ok := c.nodes[storageName] + cc, ok := c.nodes[nodeID] c.connMutex.RUnlock() if !ok { return nil, errors.New("client connection not found") @@ -259,22 +273,3 @@ func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error return cc, nil } - -// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary -func (c *Coordinator) FailoverRotation() { - c.handleSignalAndRotate() -} - -func (c *Coordinator) handleSignalAndRotate() { - failoverChan := make(chan os.Signal, 1) - signal.Notify(failoverChan, syscall.SIGUSR1) - - for { - <-failoverChan - - c.failoverMutex.Lock() - // TODO: update failover logic - c.log.Info("failover happens") - c.failoverMutex.Unlock() - } -} |