Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-08-13 02:30:44 +0300
committerJohn Cai <jcai@gitlab.com>2019-08-23 21:04:01 +0300
commita05503a3f6bd3849d90e099fe658ad8f85fa0d16 (patch)
treec2240f62ac7f433207e9aa37dc7508b03f9dc334
parentc06f35897d1e4c38535706d7965db399eafcd736 (diff)
Lazy failoverjc-lazy-failover
-rw-r--r--changelogs/unreleased/jc-lazy-failover.yml5
-rw-r--r--cmd/praefect/main.go11
-rw-r--r--internal/praefect/coordinator.go93
-rw-r--r--internal/praefect/coordinator_test.go2
-rw-r--r--internal/praefect/datastore.go54
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go2
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go43
-rw-r--r--internal/praefect/grpc-proxy/proxy/peeker.go12
-rw-r--r--internal/praefect/models/node.go1
-rw-r--r--internal/praefect/replicator.go2
-rw-r--r--internal/praefect/server.go6
-rw-r--r--internal/praefect/server_test.go2
12 files changed, 161 insertions, 72 deletions
diff --git a/changelogs/unreleased/jc-lazy-failover.yml b/changelogs/unreleased/jc-lazy-failover.yml
new file mode 100644
index 000000000..fa482a51d
--- /dev/null
+++ b/changelogs/unreleased/jc-lazy-failover.yml
@@ -0,0 +1,5 @@
+---
+title: Lazy failover
+merge_request: 1450
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index cf7d1b8ab..2afeaa7c4 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -114,8 +114,13 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- for _, node := range conf.Nodes {
- if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil {
+ nodes, err := datastore.GetStorageNodes()
+ if err != nil {
+ return fmt.Errorf("failed to get storage nodes from datastore: %v", err)
+ }
+
+ for _, node := range nodes {
+ if err := coordinator.RegisterNode(node.ID, node.Address); err != nil {
return fmt.Errorf("failed to register %s: %s", node.Address, err)
}
@@ -124,8 +129,6 @@ func run(listeners []net.Listener, conf config.Config) error {
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
- go coordinator.FailoverRotation()
-
select {
case s := <-termCh:
logger.WithField("signal", s).Warn("received signal, shutting down gracefully")
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 0c80bd398..64fb992e3 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -4,11 +4,8 @@ import (
"context"
"errors"
"fmt"
- "os"
- "os/signal"
"sort"
"sync"
- "syscall"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
@@ -35,7 +32,7 @@ type Coordinator struct {
datastore Datastore
- nodes map[string]*grpc.ClientConn
+ nodes map[int]*grpc.ClientConn
registry *protoregistry.Registry
}
@@ -47,7 +44,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*de
return &Coordinator{
log: l,
datastore: datastore,
- nodes: make(map[string]*grpc.ClientConn),
+ nodes: make(map[int]*grpc.ClientConn),
registry: registry,
}
}
@@ -57,15 +54,29 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
+func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error {
+ c.failoverMutex.Lock()
+ defer c.failoverMutex.Unlock()
+
+ for storageNodeID, conn := range c.nodes {
+ if conn == cc {
+ if err := c.datastore.Failover(storageNodeID); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ c.failoverMutex.Lock()
+ defer c.failoverMutex.Unlock()
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- c.failoverMutex.RLock()
- defer c.failoverMutex.RUnlock()
-
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
return nil, nil, nil, err
@@ -77,24 +88,24 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
}
var requestFinalizer func()
- var storage string
+ var node models.Node
if mi.Scope == protoregistry.ScopeRepository {
- storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker)
+ node, requestFinalizer, err = c.getNodeForRepositoryMessage(mi, m, peeker)
if err != nil {
return nil, nil, nil, err
}
} else {
- storage, requestFinalizer, err = c.getAnyStorageNode()
+ node, requestFinalizer, err = c.getAnyStorageNode()
if err != nil {
return nil, nil, nil, err
}
}
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, err := c.GetConnection(storage)
+ cc, err := c.GetConnection(node.ID)
if err != nil {
- return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage)
+ return nil, nil, nil, fmt.Errorf("unable to find existing client connection for node_id %d", node.ID)
}
return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil
@@ -102,51 +113,51 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
var noopRequestFinalizer = func() {}
-func (c *Coordinator) getAnyStorageNode() (string, func(), error) {
+func (c *Coordinator) getAnyStorageNode() (models.Node, func(), error) {
//TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
// proxy requests that are not repository scoped
node, err := c.datastore.GetStorageNodes()
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
if len(node) == 0 {
- return "", nil, errors.New("no node storages found")
+ return models.Node{}, nil, errors.New("no node storages found")
}
- return node[0].Storage, noopRequestFinalizer, nil
+ return node[0], noopRequestFinalizer, nil
}
-func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) {
+func (c *Coordinator) getNodeForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (models.Node, func(), error) {
targetRepo, err := mi.TargetRepo(m)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
primary, err := c.selectPrimary(mi, targetRepo)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
targetRepo.StorageName = primary.Storage
b, err := proxy.Codec().Marshal(m)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
if err = peeker.Modify(b); err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
requestFinalizer := noopRequestFinalizer
if mi.Operation == protoregistry.OpMutator {
if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
}
- return primary.Storage, requestFinalizer, nil
+ return *primary, requestFinalizer, nil
}
func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) {
@@ -202,6 +213,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
}
m, err := mi.UnmarshalRequestProto(frame)
+
if err != nil {
return nil, err
}
@@ -225,7 +237,7 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func()
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
+func (c *Coordinator) RegisterNode(nodeID int, listenAddr string) error {
conn, err := client.Dial(listenAddr,
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
@@ -236,21 +248,23 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
return err
}
- c.setConn(storageName, conn)
+ c.setConn(nodeID, conn)
+
+ c.datastore.UpdateStorageNode(nodeID, true)
return nil
}
-func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
+func (c *Coordinator) setConn(nodeID int, conn *grpc.ClientConn) {
c.connMutex.Lock()
- c.nodes[storageName] = conn
+ c.nodes[nodeID] = conn
c.connMutex.Unlock()
}
// GetConnection gets the grpc client connection based on an address
-func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) {
+func (c *Coordinator) GetConnection(nodeID int) (*grpc.ClientConn, error) {
c.connMutex.RLock()
- cc, ok := c.nodes[storageName]
+ cc, ok := c.nodes[nodeID]
c.connMutex.RUnlock()
if !ok {
return nil, errors.New("client connection not found")
@@ -259,22 +273,3 @@ func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error
return cc, nil
}
-
-// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
-func (c *Coordinator) FailoverRotation() {
- c.handleSignalAndRotate()
-}
-
-func (c *Coordinator) handleSignalAndRotate() {
- failoverChan := make(chan os.Signal, 1)
- signal.Notify(failoverChan, syscall.SIGUSR1)
-
- for {
- <-failoverChan
-
- c.failoverMutex.Lock()
- // TODO: update failover logic
- c.log.Info("failover happens")
- c.failoverMutex.Unlock()
- }
-}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index e029a3f48..f9cdcf2bc 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -57,7 +57,7 @@ func TestStreamDirector(t *testing.T) {
cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure())
require.NoError(t, err)
- coordinator.setConn("praefect-internal-1", cc)
+ coordinator.setConn(0, cc)
_, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame})
require.NoError(t, err)
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index f61a64064..6171926ed 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -75,6 +75,8 @@ type ReplicasDatastore interface {
GetStorageNodes() ([]models.Node, error)
+ GetHealthyStorageNodes() ([]models.Node, error)
+
GetPrimary(relativePath string) (*models.Node, error)
SetPrimary(relativePath string, storageNodeID int) error
@@ -84,6 +86,10 @@ type ReplicasDatastore interface {
RemoveReplica(relativePath string, storageNodeID int) error
GetRepository(relativePath string) (*models.Repository, error)
+
+ UpdateStorageNode(storageNodeID int, healthy bool) error
+
+ Failover(storageNodeID int) error
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -201,6 +207,21 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
return storageNodes, nil
}
+// GetStorageHealthyNodes gets all storage nodes
+func (md *MemoryDatastore) GetHealthyStorageNodes() ([]models.Node, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
+
+ var storageNodes []models.Node
+ for _, storageNode := range md.storageNodes.m {
+ if storageNode.Healthy {
+ storageNodes = append(storageNodes, storageNode)
+ }
+ }
+
+ return storageNodes, nil
+}
+
// GetPrimary gets the primary storage node for a repository of a repository relative path
func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.Node, error) {
md.repositories.RLock()
@@ -413,3 +434,36 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
+
+// Failover replaces any repository with storage as its primary with one of its replicas
+func (md *MemoryDatastore) Failover(storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ md.UpdateStorageNode(storageNodeID, true)
+
+ for relativePath, repository := range md.repositories.m {
+ if repository.Primary.ID == storageNodeID {
+ if len(repository.Replicas) > 0 {
+ repository.Primary = repository.Replicas[0]
+ repository.Replicas = repository.Replicas[1:]
+ md.repositories.m[relativePath] = repository
+ }
+ }
+ }
+ return nil
+}
+
+func (md *MemoryDatastore) UpdateStorageNode(nodeID int, healthy bool) error {
+ md.storageNodes.Lock()
+ defer md.storageNodes.Unlock()
+
+ storageNode, ok := md.storageNodes.m[nodeID]
+ if !ok {
+ return errors.New("node not found")
+ }
+ storageNode.Healthy = healthy
+ md.storageNodes.m[nodeID] = storageNode
+
+ return nil
+}
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 82712765f..bfbacaac8 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -22,3 +22,5 @@ import (
//
// See the rather rich example.
type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error)
+
+type ConnectionDownNotifier func(cc *grpc.ClientConn) error
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index 0ed1b3b5e..40d099da4 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -10,9 +10,11 @@ package proxy
import (
"io"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
var (
@@ -26,8 +28,8 @@ var (
// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
-func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
- streamer := &handler{director}
+func RegisterService(server *grpc.Server, director StreamDirector, connDownNotifier ConnectionDownNotifier, serviceName string, methodNames ...string) {
+ streamer := &handler{director, connDownNotifier}
fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
@@ -49,13 +51,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s
// backends. It should be used as a `grpc.UnknownServiceHandler`.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
-func TransparentHandler(director StreamDirector) grpc.StreamHandler {
- streamer := &handler{director}
+func TransparentHandler(director StreamDirector, connectionDownNotifier ConnectionDownNotifier) grpc.StreamHandler {
+ streamer := &handler{director, connectionDownNotifier}
return streamer.handler
}
type handler struct {
- director StreamDirector
+ director StreamDirector
+ connDownNotifier ConnectionDownNotifier
}
// handler is where the real magic of proxying happens.
@@ -76,18 +79,34 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return err
}
- defer func() {
- if requestFinalizer != nil {
- requestFinalizer()
- }
- }()
-
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
if err != nil {
- return err
+ status, ok := status.FromError(err)
+ if ok {
+ if status.Code() == codes.Unavailable {
+ s.connDownNotifier(backendConn)
+
+ // try again with a new backend connection
+ _, backendConn, requestFinalizer, err = s.director(serverStream.Context(), fullMethodName, peeker)
+ if err != nil {
+ return err
+ }
+
+ clientStream, err = grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
+ if err != nil {
+ return helper.DecorateError(codes.Internal, err)
+ }
+ }
+ }
}
+
+ defer func() {
+ if requestFinalizer != nil {
+ requestFinalizer()
+ }
+ }()
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
diff --git a/internal/praefect/grpc-proxy/proxy/peeker.go b/internal/praefect/grpc-proxy/proxy/peeker.go
index 1d1e02df5..e5516d4a6 100644
--- a/internal/praefect/grpc-proxy/proxy/peeker.go
+++ b/internal/praefect/grpc-proxy/proxy/peeker.go
@@ -64,9 +64,19 @@ func (p peeker) peek(n uint) ([][]byte, error) {
return nil, ErrInvalidPeekCount
}
- p.consumedStream.frames = make([]*frame, n)
peekedFrames := make([][]byte, n)
+ // if there are already consumed frames, read those
+ if p.consumedStream.frames != nil && len(p.consumedStream.frames) >= int(n) {
+ for i := 0; i < int(n); i++ {
+ peekedFrames[i] = p.consumedStream.frames[i].payload
+ }
+
+ return peekedFrames, nil
+ }
+
+ p.consumedStream.frames = make([]*frame, n)
+
for i := 0; i < len(p.consumedStream.frames); i++ {
f := &frame{}
if err := p.srcStream.RecvMsg(f); err != nil {
diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go
index 941a72e8f..1f7c730c3 100644
--- a/internal/praefect/models/node.go
+++ b/internal/praefect/models/node.go
@@ -6,6 +6,7 @@ type Node struct {
Storage string `toml:"storage"`
Address string `toml:"address"`
Token string `toml:"token"`
+ Healthy bool
}
// Repository describes a repository's relative path and its primary and list of secondaries
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index f54a6f457..ff8139211 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -178,7 +178,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
return err
}
- cc, err := r.coordinator.GetConnection(job.TargetNode.Storage)
+ cc, err := r.coordinator.GetConnection(job.TargetNode.ID)
if err != nil {
return err
}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 3a7c0fabe..64974e9a1 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -28,7 +28,7 @@ type Server struct {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry) *Server {
- grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
+ grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector, c.connDownHandler)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
@@ -58,10 +58,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
}
}
-func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
+func proxyRequiredOpts(director proxy.StreamDirector, connDownHandler proxy.ConnectionDownNotifier) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.CustomCodec(proxy.Codec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownHandler)),
}
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index d04f61121..3227c0b22 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -76,7 +76,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
backend, cleanup := newMockDownstream(t, tt.callback)
defer cleanup() // clean up mock downstream server resources
- coordinator.RegisterNode(nodeStorage.Storage, backend)
+ coordinator.RegisterNode(nodeStorage.ID, backend)
nodeStorage.Address = backend
datastore.storageNodes.m[id] = nodeStorage
}