diff options
author | John Cai <jcai@gitlab.com> | 2019-08-13 02:30:44 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-08-15 01:27:49 +0300 |
commit | ac48b8442c38ae3f4989922d228d98d596684122 (patch) | |
tree | 2b3157c516779576156a6409e99beb41052f8dbd | |
parent | e97841826b76901e6706fa7c26cfa84687edf6ae (diff) |
Naive, lazy failoverjc-naiive-failover
-rw-r--r-- | internal/praefect/coordinator.go | 18 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 19 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/director.go | 1 | ||||
-rw-r--r-- | internal/praefect/grpc-proxy/proxy/handler.go | 19 | ||||
-rw-r--r-- | internal/praefect/server.go | 6 |
5 files changed, 55 insertions, 8 deletions
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index f8eb816c3..009d4d144 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -27,6 +27,7 @@ import ( // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { + mutex sync.Mutex log *logrus.Logger failoverMutex sync.RWMutex connMutex sync.RWMutex @@ -55,8 +56,25 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } +func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + for storage, conn := range c.nodes { + if conn == cc { + if err := c.datastore.Failover(storage); err != nil { + return err + } + } + } + + return nil +} + // streamDirector determines which downstream servers receive requests func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) { + c.mutex.Lock() + defer c.mutex.Unlock() // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 9cf7642c8..d87e6e28f 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -84,6 +84,8 @@ type ReplicasDatastore interface { RemoveReplica(relativePath string, storageNodeID int) error GetRepository(relativePath string) (*models.Repository, error) + + Failover(storage string) error } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -417,3 +419,20 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } + +// Failover replaces any repository with storage as its primary with one of its replicas +func (md *MemoryDatastore) Failover(storage string) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + for relativePath, repository := range md.repositories.m { + if repository.Primary.Storage == storage { + if len(repository.Replicas) > 0 { + repository.Primary = repository.Replicas[0] + repository.Replicas = repository.Replicas[1:] + md.repositories.m[relativePath] = repository + } + } + } + return nil +} diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go index 82712765f..63e411bba 100644 --- a/internal/praefect/grpc-proxy/proxy/director.go +++ b/internal/praefect/grpc-proxy/proxy/director.go @@ -22,3 +22,4 @@ import ( // // See the rather rich example. type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error) +type ConnectionDownNotifier func(cc *grpc.ClientConn) error diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go index e527a4a5c..2ae2ec7fc 100644 --- a/internal/praefect/grpc-proxy/proxy/handler.go +++ b/internal/praefect/grpc-proxy/proxy/handler.go @@ -13,6 +13,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ( @@ -26,8 +27,8 @@ var ( // The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { - streamer := &handler{director} +func RegisterService(server *grpc.Server, director StreamDirector, connDownNotifier ConnectionDownNotifier, serviceName string, methodNames ...string) { + streamer := &handler{director, connDownNotifier} fakeDesc := &grpc.ServiceDesc{ ServiceName: serviceName, HandlerType: (*interface{})(nil), @@ -49,13 +50,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s // backends. It should be used as a `grpc.UnknownServiceHandler`. // // This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption. -func TransparentHandler(director StreamDirector) grpc.StreamHandler { - streamer := &handler{director} +func TransparentHandler(director StreamDirector, connectionDownNotifier ConnectionDownNotifier) grpc.StreamHandler { + streamer := &handler{director, connectionDownNotifier} return streamer.handler } type handler struct { - director StreamDirector + director StreamDirector + connDownNotifier ConnectionDownNotifier } // handler is where the real magic of proxying happens. @@ -80,6 +82,13 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName) if err != nil { + status, ok := status.FromError(err) + if ok { + if status.Code() == codes.Unavailable { + s.connDownNotifier(backendConn) + } + } + return err } // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 766123584..3857e1ff4 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -28,7 +28,7 @@ type Server struct { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Logger) *Server { - grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) + grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector, c.connDownHandler)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler @@ -58,10 +58,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo } } -func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { +func proxyRequiredOpts(director proxy.StreamDirector, connDownHandler proxy.ConnectionDownNotifier) []grpc.ServerOption { return []grpc.ServerOption{ grpc.CustomCodec(proxy.Codec()), - grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownHandler)), } } |