Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/coordinator.go')
-rw-r--r--internal/praefect/coordinator.go93
1 files changed, 44 insertions, 49 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 0c80bd398..64fb992e3 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -4,11 +4,8 @@ import (
"context"
"errors"
"fmt"
- "os"
- "os/signal"
"sort"
"sync"
- "syscall"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
@@ -35,7 +32,7 @@ type Coordinator struct {
datastore Datastore
- nodes map[string]*grpc.ClientConn
+ nodes map[int]*grpc.ClientConn
registry *protoregistry.Registry
}
@@ -47,7 +44,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*de
return &Coordinator{
log: l,
datastore: datastore,
- nodes: make(map[string]*grpc.ClientConn),
+ nodes: make(map[int]*grpc.ClientConn),
registry: registry,
}
}
@@ -57,15 +54,29 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
+func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error {
+ c.failoverMutex.Lock()
+ defer c.failoverMutex.Unlock()
+
+ for storageNodeID, conn := range c.nodes {
+ if conn == cc {
+ if err := c.datastore.Failover(storageNodeID); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ c.failoverMutex.Lock()
+ defer c.failoverMutex.Unlock()
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- c.failoverMutex.RLock()
- defer c.failoverMutex.RUnlock()
-
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
return nil, nil, nil, err
@@ -77,24 +88,24 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
}
var requestFinalizer func()
- var storage string
+ var node models.Node
if mi.Scope == protoregistry.ScopeRepository {
- storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker)
+ node, requestFinalizer, err = c.getNodeForRepositoryMessage(mi, m, peeker)
if err != nil {
return nil, nil, nil, err
}
} else {
- storage, requestFinalizer, err = c.getAnyStorageNode()
+ node, requestFinalizer, err = c.getAnyStorageNode()
if err != nil {
return nil, nil, nil, err
}
}
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, err := c.GetConnection(storage)
+ cc, err := c.GetConnection(node.ID)
if err != nil {
- return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage)
+ return nil, nil, nil, fmt.Errorf("unable to find existing client connection for node_id %d", node.ID)
}
return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil
@@ -102,51 +113,51 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
var noopRequestFinalizer = func() {}
-func (c *Coordinator) getAnyStorageNode() (string, func(), error) {
+func (c *Coordinator) getAnyStorageNode() (models.Node, func(), error) {
//TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
// proxy requests that are not repository scoped
node, err := c.datastore.GetStorageNodes()
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
if len(node) == 0 {
- return "", nil, errors.New("no node storages found")
+ return models.Node{}, nil, errors.New("no node storages found")
}
- return node[0].Storage, noopRequestFinalizer, nil
+ return node[0], noopRequestFinalizer, nil
}
-func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) {
+func (c *Coordinator) getNodeForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (models.Node, func(), error) {
targetRepo, err := mi.TargetRepo(m)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
primary, err := c.selectPrimary(mi, targetRepo)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
targetRepo.StorageName = primary.Storage
b, err := proxy.Codec().Marshal(m)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
if err = peeker.Modify(b); err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
requestFinalizer := noopRequestFinalizer
if mi.Operation == protoregistry.OpMutator {
if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
}
- return primary.Storage, requestFinalizer, nil
+ return *primary, requestFinalizer, nil
}
func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) {
@@ -202,6 +213,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
}
m, err := mi.UnmarshalRequestProto(frame)
+
if err != nil {
return nil, err
}
@@ -225,7 +237,7 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func()
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
+func (c *Coordinator) RegisterNode(nodeID int, listenAddr string) error {
conn, err := client.Dial(listenAddr,
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
@@ -236,21 +248,23 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
return err
}
- c.setConn(storageName, conn)
+ c.setConn(nodeID, conn)
+
+ c.datastore.UpdateStorageNode(nodeID, true)
return nil
}
-func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
+func (c *Coordinator) setConn(nodeID int, conn *grpc.ClientConn) {
c.connMutex.Lock()
- c.nodes[storageName] = conn
+ c.nodes[nodeID] = conn
c.connMutex.Unlock()
}
// GetConnection gets the grpc client connection based on an address
-func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) {
+func (c *Coordinator) GetConnection(nodeID int) (*grpc.ClientConn, error) {
c.connMutex.RLock()
- cc, ok := c.nodes[storageName]
+ cc, ok := c.nodes[nodeID]
c.connMutex.RUnlock()
if !ok {
return nil, errors.New("client connection not found")
@@ -259,22 +273,3 @@ func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error
return cc, nil
}
-
-// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
-func (c *Coordinator) FailoverRotation() {
- c.handleSignalAndRotate()
-}
-
-func (c *Coordinator) handleSignalAndRotate() {
- failoverChan := make(chan os.Signal, 1)
- signal.Notify(failoverChan, syscall.SIGUSR1)
-
- for {
- <-failoverChan
-
- c.failoverMutex.Lock()
- // TODO: update failover logic
- c.log.Info("failover happens")
- c.failoverMutex.Unlock()
- }
-}