Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-08-13 02:30:44 +0300
committerJohn Cai <jcai@gitlab.com>2019-08-15 01:27:49 +0300
commitac48b8442c38ae3f4989922d228d98d596684122 (patch)
tree2b3157c516779576156a6409e99beb41052f8dbd
parente97841826b76901e6706fa7c26cfa84687edf6ae (diff)
Naive, lazy failoverjc-naiive-failover
-rw-r--r--internal/praefect/coordinator.go18
-rw-r--r--internal/praefect/datastore.go19
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go1
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go19
-rw-r--r--internal/praefect/server.go6
5 files changed, 55 insertions, 8 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index f8eb816c3..009d4d144 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -27,6 +27,7 @@ import (
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
+ mutex sync.Mutex
log *logrus.Logger
failoverMutex sync.RWMutex
connMutex sync.RWMutex
@@ -55,8 +56,25 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
+func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+
+ for storage, conn := range c.nodes {
+ if conn == cc {
+ if err := c.datastore.Failover(storage); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 9cf7642c8..d87e6e28f 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -84,6 +84,8 @@ type ReplicasDatastore interface {
RemoveReplica(relativePath string, storageNodeID int) error
GetRepository(relativePath string) (*models.Repository, error)
+
+ Failover(storage string) error
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -417,3 +419,20 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
+
+// Failover replaces any repository with storage as its primary with one of its replicas
+func (md *MemoryDatastore) Failover(storage string) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ for relativePath, repository := range md.repositories.m {
+ if repository.Primary.Storage == storage {
+ if len(repository.Replicas) > 0 {
+ repository.Primary = repository.Replicas[0]
+ repository.Replicas = repository.Replicas[1:]
+ md.repositories.m[relativePath] = repository
+ }
+ }
+ }
+ return nil
+}
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 82712765f..63e411bba 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -22,3 +22,4 @@ import (
//
// See the rather rich example.
type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error)
+type ConnectionDownNotifier func(cc *grpc.ClientConn) error
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index e527a4a5c..2ae2ec7fc 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -13,6 +13,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
var (
@@ -26,8 +27,8 @@ var (
// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
-func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
- streamer := &handler{director}
+func RegisterService(server *grpc.Server, director StreamDirector, connDownNotifier ConnectionDownNotifier, serviceName string, methodNames ...string) {
+ streamer := &handler{director, connDownNotifier}
fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
@@ -49,13 +50,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s
// backends. It should be used as a `grpc.UnknownServiceHandler`.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
-func TransparentHandler(director StreamDirector) grpc.StreamHandler {
- streamer := &handler{director}
+func TransparentHandler(director StreamDirector, connectionDownNotifier ConnectionDownNotifier) grpc.StreamHandler {
+ streamer := &handler{director, connectionDownNotifier}
return streamer.handler
}
type handler struct {
- director StreamDirector
+ director StreamDirector
+ connDownNotifier ConnectionDownNotifier
}
// handler is where the real magic of proxying happens.
@@ -80,6 +82,13 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
if err != nil {
+ status, ok := status.FromError(err)
+ if ok {
+ if status.Code() == codes.Unavailable {
+ s.connDownNotifier(backendConn)
+ }
+ }
+
return err
}
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 766123584..3857e1ff4 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -28,7 +28,7 @@ type Server struct {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Logger) *Server {
- grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
+ grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector, c.connDownHandler)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
@@ -58,10 +58,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
}
}
-func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
+func proxyRequiredOpts(director proxy.StreamDirector, connDownHandler proxy.ConnectionDownNotifier) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.CustomCodec(proxy.Codec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownHandler)),
}
}