diff options
author | John Cai <jcai@gitlab.com> | 2020-02-05 00:37:50 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-02-14 00:18:11 +0300 |
commit | e154cba9d68f11da8c721b4997dd4e6eff532a73 (patch) | |
tree | 5c3b2525421e5e3355732d7240a047acd11e97f7 | |
parent | e97d8fd8a43b5d826eb0ee180a405fe207acd2be (diff) |
Use node manager
hooks up node manager to praefect's coordinator
23 files changed, 517 insertions, 510 deletions
diff --git a/changelogs/unreleased/jc-use-node-manager-ff.yml b/changelogs/unreleased/jc-use-node-manager-ff.yml new file mode 100644 index 000000000..69cdee0a3 --- /dev/null +++ b/changelogs/unreleased/jc-use-node-manager-ff.yml @@ -0,0 +1,5 @@ +--- +title: Wire up coordinator to use node manager +merge_request: 1806 +author: +type: changed diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index e74e09c05..31b219b70 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -42,9 +42,9 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/monitoring" @@ -129,20 +129,11 @@ func configure() (config.Config, error) { } func run(cfgs []starter.Config, conf config.Config) error { - clientConnections := conn.NewClientConnections() - - for _, virtualStorage := range conf.VirtualStorages { - for _, node := range virtualStorage.Nodes { - if _, err := clientConnections.GetConnection(node.Storage); err == nil { - continue - } - if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil { - return fmt.Errorf("failed to register %s: %s", node.Address, err) - } - - logger.WithField("node_address", node.Address).Info("registered gitaly node") - } + nodeManager, err := nodes.NewManager(logger, conf) + if err != nil { + return err } + nodeManager.Start(1*time.Second, 3*time.Second) latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus) if err != nil { @@ -157,15 +148,15 @@ func run(cfgs []starter.Config, conf config.Config) error { var ( // top level server dependencies ds = datastore.NewInMemory(conf) - coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr( "default", logger, ds, - clientConnections, + nodeManager, praefect.WithLatencyMetric(latencyMetric), praefect.WithQueueMetric(queueMetric)) - srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) + srv = praefect.NewServer(coordinator, repl, nil, logger, nodeManager, conf) serverErrors = make(chan error, 1) ) @@ -192,7 +183,7 @@ func run(cfgs []starter.Config, conf config.Config) error { go func() { serverErrors <- b.Wait() }() go func() { - serverErrors <- repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Minute)) + serverErrors <- repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second)) }() go coordinator.FailoverRotation() diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index f9130bfd1..6654fee19 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -10,12 +10,11 @@ import ( "github.com/stretchr/testify/require" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/internal/config/auth" - "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "google.golang.org/grpc" @@ -175,6 +174,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func Storage: "praefect-internal-0", DefaultPrimary: true, Address: backend, + Token: backendToken, }, }, }, @@ -187,17 +187,17 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func panic(err) } - logEntry := log.Default() + logEntry := testhelper.DiscardTestEntry(t) ds := datastore.NewInMemory(conf) - clientConnections := conn.NewClientConnections() - clientConnections.RegisterNode("praefect-internal-0", backend, backendToken) + nodeMgr, err := nodes.NewManager(logEntry, conf) + require.NoError(t, err) - coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd) + coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, fd) - replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections) + replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, nodeMgr) - srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf) + srv := NewServer(coordinator, replMgr, nil, logEntry, nodeMgr, conf) serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() diff --git a/internal/praefect/conn/client_connections.go b/internal/praefect/conn/client_connections.go deleted file mode 100644 index d2e36f352..000000000 --- a/internal/praefect/conn/client_connections.go +++ /dev/null @@ -1,69 +0,0 @@ -package conn - -import ( - "errors" - "sync" - - gitalyauth "gitlab.com/gitlab-org/gitaly/auth" - "gitlab.com/gitlab-org/gitaly/client" - "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" - "google.golang.org/grpc" -) - -// ErrConnectionNotFound indicates the connection for a given storage has not yet been registered -var ErrConnectionNotFound = errors.New("client connection not found") - -// ErrAlreadyRegistered indicates the client connection for a given storage has already been registered -var ErrAlreadyRegistered = errors.New("client connection already registered") - -// ClientConnections contains ready to use grpc client connections -type ClientConnections struct { - connMutex sync.RWMutex - nodes map[string]*grpc.ClientConn -} - -// NewClientConnections creates a new ClientConnections struct -func NewClientConnections() *ClientConnections { - return &ClientConnections{ - nodes: make(map[string]*grpc.ClientConn), - } -} - -// RegisterNode will direct traffic to the supplied downstream connection when the storage location -// is encountered. -func (c *ClientConnections) RegisterNode(storageName, listenAddr, token string) error { - conn, err := client.Dial(listenAddr, - []grpc.DialOption{ - grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)), - }, - ) - if err != nil { - return err - } - - return c.setConn(storageName, conn) -} - -func (c *ClientConnections) setConn(storageName string, conn *grpc.ClientConn) error { - c.connMutex.Lock() - if _, ok := c.nodes[storageName]; ok { - return ErrAlreadyRegistered - } - c.nodes[storageName] = conn - c.connMutex.Unlock() - - return nil -} - -// GetConnection gets the grpc client connection based on an address -func (c *ClientConnections) GetConnection(storageName string) (*grpc.ClientConn, error) { - c.connMutex.RLock() - cc, ok := c.nodes[storageName] - c.connMutex.RUnlock() - if !ok { - return nil, ErrConnectionNotFound - } - - return cc, nil -} diff --git a/internal/praefect/conn/client_connections_test.go b/internal/praefect/conn/client_connections_test.go deleted file mode 100644 index 2c5f77a54..000000000 --- a/internal/praefect/conn/client_connections_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package conn - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestRegisterNode(t *testing.T) { - storageName := "default" - tcpAddress := "address1" - clientConn := NewClientConnections() - - _, err := clientConn.GetConnection(storageName) - require.Equal(t, ErrConnectionNotFound, err) - - require.NoError(t, clientConn.RegisterNode(storageName, fmt.Sprintf("tcp://%s", tcpAddress), "token")) - - conn, err := clientConn.GetConnection(storageName) - require.NoError(t, err) - require.Equal(t, tcpAddress, conn.Target()) - - err = clientConn.RegisterNode(storageName, "tcp://some-other-address", "token") - require.Equal(t, ErrAlreadyRegistered, err) -} diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 3ed5ad57c..45b0dd841 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -2,8 +2,6 @@ package praefect import ( "context" - "errors" - "fmt" "os" "os/signal" "sync" @@ -13,10 +11,9 @@ import ( "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/codes" @@ -31,7 +28,7 @@ func isDestructive(methodName string) bool { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - connections *conn.ClientConnections + nodeMgr nodes.Manager log *logrus.Entry failoverMutex sync.RWMutex @@ -42,16 +39,16 @@ type Coordinator struct { } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, nodeMgr nodes.Manager, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) return &Coordinator{ - log: l, - datastore: ds, - registry: registry, - connections: clientConnections, - conf: conf, + log: l, + datastore: ds, + registry: registry, + nodeMgr: nodeMgr, + conf: conf, } } @@ -60,127 +57,121 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } -// streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { - // For phase 1, we need to route messages based on the storage location - // to the appropriate Gitaly node. - c.log.Debugf("Stream director received method %s", fullMethodName) - - c.failoverMutex.RLock() - defer c.failoverMutex.RUnlock() +func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi protoregistry.MethodInfo, peeker proxy.StreamModifier, fullMethodName string, m proto.Message) (*proxy.StreamParameters, error) { + targetRepo, err := mi.TargetRepo(m) + if err != nil { + if err == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + return nil, err + } - mi, err := c.registry.LookupMethod(fullMethodName) + shard, err := c.nodeMgr.GetShard(targetRepo.GetStorageName()) if err != nil { return nil, err } - m, err := protoMessageFromPeeker(mi, peeker) + primary, err := shard.GetPrimary() if err != nil { return nil, err } - var requestFinalizer func() - var storage string + if err = c.rewriteStorageForRepositoryMessage(mi, m, peeker, primary.GetStorage()); err != nil { + if err == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } - if mi.Scope == protoregistry.ScopeRepository { - var getRepoErr error - storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) + return nil, err + } - if getRepoErr == protoregistry.ErrTargetRepoMissing { - return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) - } + var requestFinalizer func() - if getRepoErr != nil { - return nil, getRepoErr + if mi.Operation == protoregistry.OpMutator { + change := datastore.UpdateRepo + if isDestructive(fullMethodName) { + change = datastore.DeleteRepo } - if storage == "" { - return nil, status.Error(codes.InvalidArgument, "storage not found") - } - } else { - storage, requestFinalizer, err = c.getAnyStorageNode() + secondaries, err := shard.GetSecondaries() if err != nil { return nil, err } - } - // We only need the primary node, as there's only one primary storage - // location per praefect at this time - cc, err := c.connections.GetConnection(storage) - if err != nil { - return nil, fmt.Errorf("unable to find existing client connection for %s", storage) + + if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil { + return nil, err + } } - return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil + return proxy.NewStreamParameters(ctx, primary.GetConnection(), requestFinalizer, nil), nil } -var noopRequestFinalizer = func() {} +// streamDirector determines which downstream servers receive requests +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { + // For phase 1, we need to route messages based on the storage location + // to the appropriate Gitaly node. + c.log.Debugf("Stream director received method %s", fullMethodName) + + c.failoverMutex.RLock() + defer c.failoverMutex.RUnlock() -func (c *Coordinator) getAnyStorageNode() (string, func(), error) { - //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to - // proxy requests that are not repository scoped - node, err := c.datastore.GetStorageNodes() + mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { - return "", nil, err + return nil, err } - if len(node) == 0 { - return "", nil, errors.New("no node storages found") + + m, err := protoMessageFromPeeker(mi, peeker) + if err != nil { + return nil, err } - return node[0].Storage, noopRequestFinalizer, nil -} + if mi.Scope == protoregistry.ScopeRepository { + return c.directRepositoryScopedMessage(ctx, mi, peeker, fullMethodName, m) + } -func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, method string) (string, func(), error) { - targetRepo, err := mi.TargetRepo(m) + // TODO: remove the need to handle non repository scoped RPCs. The only remaining one is FindRemoteRepository. + // https://gitlab.com/gitlab-org/gitaly/issues/2442. One this issue is resolved, we can explicitly require that + // any RPC that gets proxied through praefect must be repository scoped. + shard, err := c.nodeMgr.GetShard(c.conf.VirtualStorages[0].Name) if err != nil { - return "", nil, err + return nil, err } - primary, err := c.datastore.GetPrimary(targetRepo.GetStorageName()) + primary, err := shard.GetPrimary() if err != nil { - return "", nil, err + return nil, err } - secondaries, err := c.datastore.GetSecondaries(targetRepo.GetStorageName()) + return proxy.NewStreamParameters(ctx, primary.GetConnection(), func() {}, nil), nil +} + +func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, primaryStorage string) error { + targetRepo, err := mi.TargetRepo(m) if err != nil { - return "", nil, err + return err } // rewrite storage name - targetRepo.StorageName = primary.Storage + targetRepo.StorageName = primaryStorage additionalRepo, ok, err := mi.AdditionalRepo(m) if err != nil { - return "", nil, err + return err } if ok { - additionalRepo.StorageName = primary.Storage + additionalRepo.StorageName = primaryStorage } b, err := proxy.Codec().Marshal(m) if err != nil { - return "", nil, err + return err } if err = peeker.Modify(b); err != nil { - return "", nil, err - } - - requestFinalizer := noopRequestFinalizer - - // TODO: move the logic of creating replication jobs to the streamDirector method - if mi.Operation == protoregistry.OpMutator { - change := datastore.UpdateRepo - if isDestructive(method) { - change = datastore.DeleteRepo - } - - if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil { - return "", nil, err - } + return err } - return primary.Storage, requestFinalizer, nil + return nil } func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) { @@ -197,8 +188,12 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi return m, nil } -func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary models.Node, secondaries []models.Node, change datastore.ChangeType) (func(), error) { - jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary, secondaries, change) +func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) { + var secondaryStorages []string + for _, secondary := range secondaries { + secondaryStorages = append(secondaryStorages, secondary.GetStorage()) + } + jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change) if err != nil { return nil, err } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 4283779a1..dccab79f0 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1,18 +1,16 @@ package praefect import ( - "fmt" "io/ioutil" "testing" "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -57,10 +55,13 @@ func TestStreamDirector(t *testing.T) { defer cancel() address := "gitaly-primary.example.com" - clientConnections := conn.NewClientConnections() - clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token") - coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf) + entry := testhelper.DiscardTestEntry(t) + + nodeMgr, err := nodes.NewManager(entry, conf) + require.NoError(t, err) + + coordinator := NewCoordinator(entry, ds, nodeMgr, conf) require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index bc677aa33..5896e7797 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -108,7 +108,7 @@ type ReplJobsDatastore interface { // CreateReplicaReplJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error) + CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) // UpdateReplJobState updates the state of an existing replication job UpdateReplJobState(jobID uint64, newState JobState) error @@ -302,7 +302,7 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi // CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() @@ -312,15 +312,15 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary mo var jobIDs []uint64 - for _, secondary := range secondaries { + for _, secondaryStorage := range secondaryStorages { nextID := uint64(len(md.jobs.records) + 1) md.jobs.records[nextID] = jobRecord{ change: change, - targetNodeStorage: secondary.Storage, + targetNodeStorage: secondaryStorage, state: JobStatePending, relativePath: relativePath, - sourceNodeStorage: primary.Storage, + sourceNodeStorage: primaryStorage, } jobIDs = append(jobIDs, nextID) diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go index 81e03e2cf..0ee189123 100644 --- a/internal/praefect/datastore/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -44,7 +44,7 @@ var operations = []struct { { desc: "insert replication job", opFn: func(t *testing.T, ds Datastore) { - _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1, []models.Node{stor2}, UpdateRepo) + _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo) require.NoError(t, err) }, }, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 953bb88fe..16094ac6c 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -14,11 +14,11 @@ import ( internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth" "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/server/auth" @@ -28,6 +28,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) func waitUntil(t *testing.T, ch <-chan struct{}, timeout time.Duration) { @@ -70,10 +72,10 @@ func testConfig(backends int) config.Config { // setupServer wires all praefect dependencies together via dependency // injection -func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) { +func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) { var ( ds = datastore.NewInMemory(conf) - coordinator = NewCoordinator(l, ds, clientCC, conf, fds...) + coordinator = NewCoordinator(l, ds, nodeMgr, conf, fds...) ) var defaultNode *models.Node @@ -88,14 +90,14 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti defaultNode.Storage, l, ds, - clientCC, + nodeMgr, ) server := NewServer( coordinator, replmgr, nil, l, - clientCC, + nodeMgr, conf, ) @@ -108,8 +110,6 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti // configured storage node. // requires there to be only 1 virtual storage func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) { - clientCC := conn.NewClientConnections() - require.Len(t, conf.VirtualStorages, 1) require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes), "mock server count doesn't match config nodes") @@ -123,12 +123,15 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st backendAddr, cleanup := newMockDownstream(t, node.Token, backend) cleanups = append(cleanups, cleanup) - clientCC.RegisterNode(node.Storage, backendAddr, node.Token) node.Address = backendAddr conf.VirtualStorages[0].Nodes[i] = node } - _, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)}) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf) + require.NoError(t, err) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + + _, prf := setupServer(t, conf, nodeMgr, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)}) listener, port := listenAvailPort(t) t.Logf("praefect listening on port %d", port) @@ -169,14 +172,12 @@ func noopBackoffFunc() (backoff, backoffReset) { // requires exactly 1 virtual storage func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { require.Len(t, conf.VirtualStorages, 1) - clientCC := conn.NewClientConnections() var cleanups []testhelper.Cleanup for i, node := range conf.VirtualStorages[0].Nodes { _, backendAddr, cleanup := runInternalGitalyServer(t, node.Token) cleanups = append(cleanups, cleanup) - clientCC.RegisterNode(node.Storage, backendAddr, node.Token) node.Address = backendAddr conf.VirtualStorages[0].Nodes[i] = node } @@ -184,13 +185,17 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client ds := datastore.NewInMemory(conf) logEntry := log.Default() - coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf) + require.NoError(t, err) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + + coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, protoregistry.GitalyProtoFileDescriptors...) replmgr := NewReplMgr( - "", + conf.VirtualStorages[0].Name, logEntry, ds, - clientCC, + nodeMgr, WithQueueMetric(&promtest.MockGauge{}), WithLatencyMetric(&promtest.MockHistogram{}), ) @@ -199,7 +204,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client replmgr, nil, logEntry, - clientCC, + nodeMgr, conf, ) @@ -249,6 +254,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string, gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer()) gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer)) + healthpb.RegisterHealthServer(server, health.NewServer()) errQ := make(chan error) diff --git a/internal/praefect/node_manager.go b/internal/praefect/nodes/manager.go index 50880e89f..3b090372d 100644 --- a/internal/praefect/node_manager.go +++ b/internal/praefect/nodes/manager.go @@ -1,7 +1,6 @@ -package praefect +package nodes import ( - "bytes" "context" "errors" "sync" @@ -23,8 +22,8 @@ type Shard interface { GetSecondaries() ([]Node, error) } -// NodeManager is responsible for returning shards for virtual storages -type NodeManager interface { +// Manager is responsible for returning shards for virtual storages +type Manager interface { GetShard(virtualStorageName string) (Shard, error) } @@ -63,11 +62,13 @@ func (s *shard) GetSecondaries() ([]Node, error) { return secondaries, nil } -// NodeMgr is a concrete type that adheres to the NodeManager interface -type NodeMgr struct { - shards map[string]*shard - log *logrus.Entry +// Mgr is a concrete type that adheres to the Manager interface +type Mgr struct { + shards map[string]*shard + // staticShards never changes based on node health. It is a static set of shards that comes from the config's + // VirtualStorages failoverEnabled bool + log *logrus.Entry } // ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence @@ -75,23 +76,23 @@ type NodeMgr struct { var ErrPrimaryNotHealthy = errors.New("primary is not healthy") // NewNodeManager creates a new NodeMgr based on virtual storage configs -func NewNodeManager(log *logrus.Entry, c config.Config) (*NodeMgr, error) { +func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) (*Mgr, error) { shards := make(map[string]*shard) - for _, virtualStorage := range c.VirtualStorages { var secondaries []*nodeStatus var primary *nodeStatus for _, node := range virtualStorage.Nodes { conn, err := client.Dial(node.Address, - []grpc.DialOption{ - grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), - grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)), - }, + append( + []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)), + }, dialOpts...), ) if err != nil { return nil, err } - ns := newConnectionStatus(*node, conn) + ns := newConnectionStatus(*node, conn, log) if node.DefaultPrimary { primary = ns @@ -107,7 +108,7 @@ func NewNodeManager(log *logrus.Entry, c config.Config) (*NodeMgr, error) { } } - return &NodeMgr{ + return &Mgr{ shards: shards, log: log, failoverEnabled: c.FailoverEnabled, @@ -118,36 +119,32 @@ func NewNodeManager(log *logrus.Entry, c config.Config) (*NodeMgr, error) { // for deeming a node "healthy" const healthcheckThreshold = 3 -func (n *NodeMgr) bootstrap(d time.Duration) error { +func (n *Mgr) bootstrap(d time.Duration) error { timer := time.NewTimer(d) defer timer.Stop() for i := 0; i < healthcheckThreshold; i++ { <-timer.C - if err := n.checkShards(); err != nil { - return err - } + n.checkShards() timer.Reset(d) } return nil } -func (n *NodeMgr) monitor(d time.Duration) { +func (n *Mgr) monitor(d time.Duration) { ticker := time.NewTicker(d) defer ticker.Stop() for { <-ticker.C - if err := n.checkShards(); err != nil { - n.log.WithError(err).Error("error when checking shards") - } + n.checkShards() } } // Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off // the monitoring process. Start must be called before NodeMgr can be used. -func (n *NodeMgr) Start(bootstrapInterval, monitorInterval time.Duration) { +func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) { if n.failoverEnabled { n.bootstrap(bootstrapInterval) go n.monitor(monitorInterval) @@ -155,7 +152,7 @@ func (n *NodeMgr) Start(bootstrapInterval, monitorInterval time.Duration) { } // GetShard retrieves a shard for a virtual storage name -func (n *NodeMgr) GetShard(virtualStorageName string) (Shard, error) { +func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) { shard, ok := n.shards[virtualStorageName] if !ok { return nil, errors.New("virtual storage does not exist") @@ -170,28 +167,11 @@ func (n *NodeMgr) GetShard(virtualStorageName string) (Shard, error) { return shard, nil } -type errCollection []error - -func (e errCollection) Error() string { - sb := bytes.NewBufferString("") - for _, err := range e { - sb.WriteString(err.Error()) - sb.WriteString("\n") - } - - return sb.String() -} - -func (n *NodeMgr) checkShards() error { - var errs errCollection +func (n *Mgr) checkShards() { for _, shard := range n.shards { - if err := shard.primary.check(); err != nil { - errs = append(errs, err) - } + shard.primary.check() for _, secondary := range shard.secondaries { - if err := secondary.check(); err != nil { - errs = append(errs, err) - } + secondary.check() } if shard.primary.isHealthy() { @@ -217,19 +197,14 @@ func (n *NodeMgr) checkShards() error { shard.primary = newPrimary shard.m.Unlock() } - - if len(errs) > 0 { - return errs - } - - return nil } -func newConnectionStatus(node models.Node, cc *grpc.ClientConn) *nodeStatus { +func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry) *nodeStatus { return &nodeStatus{ Node: node, ClientConn: cc, statuses: make([]healthpb.HealthCheckResponse_ServingStatus, 0), + log: l, } } @@ -237,6 +212,7 @@ type nodeStatus struct { models.Node *grpc.ClientConn statuses []healthpb.HealthCheckResponse_ServingStatus + log *logrus.Entry } // GetStorage gets the storage name of a node @@ -273,13 +249,14 @@ func (n *nodeStatus) isHealthy() bool { return true } -func (n *nodeStatus) check() error { +func (n *nodeStatus) check() { client := healthpb.NewHealthClient(n.ClientConn) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: "TestService"}) + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) if err != nil { + n.log.WithError(err).WithField("address", n.Address).Warn("error when pinging healthcheck") resp = &healthpb.HealthCheckResponse{ Status: healthpb.HealthCheckResponse_UNKNOWN, } @@ -289,6 +266,4 @@ func (n *nodeStatus) check() error { if len(n.statuses) > healthcheckThreshold { n.statuses = n.statuses[1:] } - - return err } diff --git a/internal/praefect/node_manager_test.go b/internal/praefect/nodes/manager_test.go index 45682a8d4..69a24c10c 100644 --- a/internal/praefect/node_manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -1,42 +1,51 @@ -package praefect +package nodes import ( - "net" "testing" "time" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "google.golang.org/grpc" - "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" ) func TestNodeStatus(t *testing.T) { - cc, healthSvr, cleanup := newHealthServer(t, testhelper.GetTemporaryGitalySocketFileName()) - defer cleanup() + socket := testhelper.GetTemporaryGitalySocketFileName() + svr, healthSvr := testhelper.NewServerWithHealth(t, socket) + defer svr.Stop() - cs := newConnectionStatus(models.Node{}, cc) + cc, err := grpc.Dial( + "unix://"+socket, + grpc.WithInsecure(), + ) + + require.NoError(t, err) + + cs := newConnectionStatus(models.Node{}, cc, testhelper.DiscardTestEntry(t)) require.False(t, cs.isHealthy()) for i := 0; i < healthcheckThreshold; i++ { - require.NoError(t, cs.check()) + cs.check() } require.True(t, cs.isHealthy()) - healthSvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) - require.NoError(t, cs.check()) + cs.check() require.False(t, cs.isHealthy()) } func TestNodeManager(t *testing.T) { - internalSocket0 := testhelper.GetTemporaryGitalySocketFileName() - internalSocket1 := testhelper.GetTemporaryGitalySocketFileName() + internalSocket0, internalSocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName() + srv0, healthSrv0 := testhelper.NewServerWithHealth(t, internalSocket0) + defer srv0.Stop() + + srv1, healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1) + defer srv1.Stop() virtualStorages := []*config.VirtualStorage{ { @@ -64,16 +73,10 @@ func TestNodeManager(t *testing.T) { FailoverEnabled: false, } - _, srv0, cancel0 := newHealthServer(t, internalSocket0) - defer cancel0() - - _, _, cancel1 := newHealthServer(t, internalSocket1) - defer cancel1() - - nm, err := NewNodeManager(log.Default(), confWithFailover) + nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover) require.NoError(t, err) - nmWithoutFailover, err := NewNodeManager(log.Default(), confWithoutFailover) + nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover) require.NoError(t, err) nm.Start(1*time.Millisecond, 5*time.Second) @@ -109,7 +112,7 @@ func TestNodeManager(t *testing.T) { require.Equal(t, virtualStorages[0].Nodes[1].Storage, secondaries[0].GetStorage()) require.Equal(t, virtualStorages[0].Nodes[1].Address, secondaries[0].GetAddress()) - srv0.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_UNKNOWN) + healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) nm.checkShards() // since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy @@ -149,34 +152,9 @@ func TestNodeManager(t *testing.T) { require.Equal(t, virtualStorages[0].Nodes[0].Storage, secondaries[0].GetStorage()) require.Equal(t, virtualStorages[0].Nodes[0].Address, secondaries[0].GetAddress()) - cancel1() + healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) nm.checkShards() _, err = nm.GetShard("virtual-storage-0") require.Error(t, err, "should return error since no nodes are healthy") } - -func newHealthServer(t testing.TB, socketName string) (*grpc.ClientConn, *health.Server, func()) { - srv := testhelper.NewTestGrpcServer(t, nil, nil) - healthSrvr := health.NewServer() - grpc_health_v1.RegisterHealthServer(srv, healthSrvr) - healthSrvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_SERVING) - - lis, err := net.Listen("unix", socketName) - require.NoError(t, err) - - go srv.Serve(lis) - - cleanup := func() { - srv.Stop() - } - - cc, err := grpc.Dial( - "unix://"+socketName, - grpc.WithInsecure(), - ) - - require.NoError(t, err) - - return cc, healthSrvr, cleanup -} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index f0e7abc5f..49af0be3d 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -8,9 +8,9 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -148,12 +148,12 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, type ReplMgr struct { log *logrus.Entry datastore datastore.Datastore - clientConnections *conn.ClientConnections - targetNode string // which replica is this replicator responsible for? + nodeManager nodes.Manager + virtualStorage string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic replQueueMetric metrics.Gauge replLatencyMetric metrics.Histogram - + replJobTimeout time.Duration // whitelist contains the project names of the repos we wish to replicate whitelist map[string]struct{} } @@ -177,14 +177,14 @@ func WithLatencyMetric(h metrics.Histogram) func(*ReplMgr) { // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(virtualStorage string, log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, datastore: datastore, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, - targetNode: targetNode, - clientConnections: c, + virtualStorage: virtualStorage, + nodeManager: nodeMgr, replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}), replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}), } @@ -248,65 +248,80 @@ const ( maxAttempts = 3 ) +func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []nodes.Node, err error) { + shard, err := r.nodeManager.GetShard(r.virtualStorage) + if err != nil { + return nil, nil, err + } + + primary, err = shard.GetPrimary() + if err != nil { + return nil, nil, err + } + + secondaries, err = shard.GetSecondaries() + if err != nil { + return nil, nil, err + } + + return primary, secondaries, nil +} + // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error { backoff, reset := b() for { - nodes, err := r.datastore.GetStorageNodes() - if err != nil { - r.log.WithError(err).Error("error when getting storage nodes") - return err - } - var totalJobs int - for _, node := range nodes { - jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, node.Storage, 10) - if err != nil { - r.log.WithField("storage", node.Storage).WithError(err).Error("error when retrieving jobs for replication") - continue - } + primary, secondaries, err := r.getPrimaryAndSecondaries() + if err == nil { + for _, secondary := range secondaries { + jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, secondary.GetStorage(), 10) + if err != nil { + return err + } - totalJobs += len(jobs) + totalJobs += len(jobs) - reposReplicated := make(map[string]struct{}) - for _, job := range jobs { - if job.Attempts >= maxAttempts { - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil { - r.log.WithError(err).Error("error when updating replication job status to cancelled") + reposReplicated := make(map[string]struct{}) + for _, job := range jobs { + if job.Attempts >= maxAttempts { + if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil { + r.log.WithError(err).Error("error when updating replication job status to cancelled") + } + continue } - continue - } - if _, ok := reposReplicated[job.RelativePath]; ok { - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil { - r.log.WithError(err).Error("error when updating replication job status to cancelled") + if _, ok := reposReplicated[job.RelativePath]; ok { + if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil { + r.log.WithError(err).Error("error when updating replication job status to cancelled") + } + continue } - continue - } - if err := r.processReplJob(ctx, job); err != nil { - r.log.WithFields(logrus.Fields{ - logWithReplJobID: job.ID, - "from_storage": job.SourceNode.Storage, - "to_storage": job.TargetNode.Storage, - }).WithError(err).Error("replication job failed") - - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil { - r.log.WithError(err).Error("error when updating replication job status to failed") + if err = r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil { + r.log.WithFields(logrus.Fields{ + logWithReplJobID: job.ID, + "from_storage": job.SourceNode.Storage, + "to_storage": job.TargetNode.Storage, + }).WithError(err).Error("replication job failed") + if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil { + r.log.WithError(err).Error("error when updating replication job status to failed") + } continue } - } - reposReplicated[job.RelativePath] = struct{}{} + reposReplicated[job.RelativePath] = struct{}{} + } } + } else { + r.log.WithError(err).WithField("virtual_storage", r.virtualStorage).Error("error when getting primary and secondaries") } if totalJobs == 0 { select { case <-time.After(backoff()): continue - case <-ctx.Done(): return ctx.Err() } @@ -320,7 +335,8 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error { // is a crutch in this situation. Ideally, we need to update state somewhere // with information regarding the replication failure. See follow up issue: // https://gitlab.com/gitlab-org/gitaly/issues/2138 -func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) error { + +func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error { l := r.log. WithField(logWithReplJobID, job.ID). WithField(logWithReplSource, job.SourceNode). @@ -336,19 +352,17 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) erro return err } - targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage) - if err != nil { - l.WithError(err).Error("unable to obtain client connection for secondary node in replication job") - return err - } + var replCtx context.Context + var cancel func() - sourceCC, err := r.clientConnections.GetConnection(job.SourceNode.Storage) - if err != nil { - l.WithError(err).Error("unable to obtain client connection for primary node in replication job") - return err + if r.replJobTimeout > 0 { + replCtx, cancel = context.WithTimeout(ctx, r.replJobTimeout) + } else { + replCtx, cancel = context.WithCancel(ctx) } + defer cancel() - injectedCtx, err := helper.InjectGitalyServers(ctx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token) + injectedCtx, err := helper.InjectGitalyServers(replCtx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token) if err != nil { l.WithError(err).Error("unable to inject Gitaly servers into context for replication job") return err diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 30bf4ae08..24b10b051 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -13,18 +13,21 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git/objectpool" - gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" + objectpoolservice "gitlab.com/gitlab-org/gitaly/internal/service/objectpool" + "gitlab.com/gitlab-org/gitaly/internal/service/remote" + "gitlab.com/gitlab-org/gitaly/internal/service/repository" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/reflection" ) func TestProcessReplicationJob(t *testing.T) { @@ -115,7 +118,11 @@ func TestProcessReplicationJob(t *testing.T) { secondaries, err := ds.GetSecondaries(config.VirtualStorages[0].Name) require.NoError(t, err) - _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, secondaries, datastore.UpdateRepo) + var secondaryStorages []string + for _, secondary := range secondaries { + secondaryStorages = append(secondaryStorages, secondary.Storage) + } + _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo) require.NoError(t, err) jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1) @@ -127,19 +134,28 @@ func TestProcessReplicationJob(t *testing.T) { }) var replicator defaultReplicator - replicator.log = gitaly_log.Default() + entry := testhelper.DiscardTestEntry(t) + replicator.log = entry - clientCC := conn.NewClientConnections() - require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token)) - require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token)) + nodeMgr, err := nodes.NewManager(entry, config) + require.NoError(t, err) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) var mockReplicationGauge promtest.MockGauge var mockReplicationHistogram promtest.MockHistogram - replMgr := NewReplMgr("", gitaly_log.Default(), ds, clientCC, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge)) + replMgr := NewReplMgr("", testhelper.DiscardTestEntry(t), ds, nodeMgr, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge)) replMgr.replicator = replicator - replMgr.processReplJob(ctx, jobs[0]) + shard, err := nodeMgr.GetShard(config.VirtualStorages[0].Name) + require.NoError(t, err) + primaryNode, err := shard.GetPrimary() + require.NoError(t, err) + secondaryNodes, err := shard.GetSecondaries() + require.NoError(t, err) + require.Len(t, secondaryNodes, 1) + + replMgr.processReplJob(ctx, jobs[0], primaryNode.GetConnection(), secondaryNodes[0].GetConnection()) relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath) require.NoError(t, err) @@ -175,7 +191,8 @@ func TestConfirmReplication(t *testing.T) { require.NoError(t, err) var replicator defaultReplicator - replicator.log = gitaly_log.Default() + entry := testhelper.DiscardTestEntry(t) + replicator.log = entry equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB) require.NoError(t, err) @@ -190,9 +207,16 @@ func TestConfirmReplication(t *testing.T) { require.False(t, equal) } -func TestProcessBacklog(t *testing.T) { - srv, srvSocketPath := runFullGitalyServer(t) - defer srv.Stop() +func TestProcessBacklog_FailedJobs(t *testing.T) { + primarySvr, primarySocket := newReplicationService(t) + defer primarySvr.Stop() + + backupSvr, backupSocket := newReplicationService(t) + backupSvr.Stop() + + internalListener, err := net.Listen("unix", gitaly_config.GitalyInternalSocketPath()) + require.NoError(t, err) + go backupSvr.Serve(internalListener) testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -204,32 +228,15 @@ func TestProcessBacklog(t *testing.T) { defer os.RemoveAll(backupDir) - oldStorages := gitaly_config.Config.Storages - defer func() { - gitaly_config.Config.Storages = oldStorages - }() - - gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{ - Name: backupStorageName, - Path: backupDir, - }, - gitaly_config.Storage{ - Name: "default", - Path: testhelper.GitlabTestStoragePath(), - }, - ) - primary := models.Node{ Storage: "default", - Address: srvSocketPath, - Token: gitaly_config.Config.Auth.Token, + Address: "unix://" + primarySocket, DefaultPrimary: true, } secondary := models.Node{ Storage: backupStorageName, - Address: srvSocketPath, - Token: gitaly_config.Config.Auth.Token, + Address: "unix://" + backupSocket, } config := config.Config{ @@ -244,24 +251,41 @@ func TestProcessBacklog(t *testing.T) { }, } + ctx, cancel := testhelper.Context() + defer func(oldStorages []gitaly_config.Storage) { + gitaly_config.Config.Storages = oldStorages + cancel() + }(gitaly_config.Config.Storages) + + gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{ + Name: backupStorageName, + Path: backupDir, + }, + gitaly_config.Storage{ + Name: "default", + Path: testhelper.GitlabTestStoragePath(), + }, + ) + ds := datastore.NewInMemory(config) - ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, []models.Node{secondary}, datastore.UpdateRepo) + ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) require.NoError(t, err) require.Len(t, ids, 1) + entry := testhelper.DiscardTestEntry(t) + require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady)) - clientCC := conn.NewClientConnections() - require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token)) + nodeMgr, err := nodes.NewManager(entry, config) + require.NoError(t, err) - replMgr := NewReplMgr(backupStorageName, gitaly_log.Default(), ds, clientCC) - ctx, cancel := testhelper.Context() - defer cancel() + replMgr := NewReplMgr("default", entry, ds, nodeMgr) + replMgr.replJobTimeout = 100 * time.Millisecond go replMgr.ProcessBacklog(ctx, noopBackoffFunc) timeLimit := time.NewTimer(5 * time.Second) - ticker := time.NewTicker(10 * time.Millisecond) + ticker := time.NewTicker(1 * time.Second) // the job will fail to process because the client connection for "backup" is not registered. It should fail maxAttempts times // and get cancelled. @@ -273,33 +297,108 @@ TestJobGetsCancelled: require.NoError(t, err) if len(replJobs) == 1 { //success + timeLimit.Stop() break TestJobGetsCancelled } case <-timeLimit.C: - t.Fatal("time limit expired for job to complete") + t.Fatal("time limit expired for job to be deemed dead") } } +} + +func TestProcessBacklog_Success(t *testing.T) { + primarySvr, primarySocket := newReplicationService(t) + defer primarySvr.Stop() + + backupSvr, backupSocket := newReplicationService(t) + defer backupSvr.Stop() + + internalListener, err := net.Listen("unix", gitaly_config.GitalyInternalSocketPath()) + require.NoError(t, err) + go backupSvr.Serve(internalListener) + + testRepo, _, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + backupStorageName := "backup" + + backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName) + require.NoError(t, err) + + defer os.RemoveAll(backupDir) + + primary := models.Node{ + Storage: "default", + Address: "unix://" + primarySocket, + DefaultPrimary: true, + } - require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token)) - ids, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, []models.Node{secondary}, datastore.UpdateRepo) + secondary := models.Node{ + Storage: backupStorageName, + Address: "unix://" + backupSocket, + } + + config := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + &primary, + &secondary, + }, + }, + }, + } + + ctx, cancel := testhelper.Context() + defer func(oldStorages []gitaly_config.Storage) { + gitaly_config.Config.Storages = oldStorages + cancel() + }(gitaly_config.Config.Storages) + + gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{ + Name: backupStorageName, + Path: backupDir, + }, + gitaly_config.Storage{ + Name: "default", + Path: testhelper.GitlabTestStoragePath(), + }, + ) + + ds := datastore.NewInMemory(config) + ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo) require.NoError(t, err) require.Len(t, ids, 1) + + entry := testhelper.DiscardTestEntry(t) + require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady)) - timeLimit.Reset(5 * time.Second) - // Once the node is registered, and we try the job again it should succeed + nodeMgr, err := nodes.NewManager(entry, config) + require.NoError(t, err) + + replMgr := NewReplMgr("default", entry, ds, nodeMgr) + replMgr.replJobTimeout = 5 * time.Second + + go replMgr.ProcessBacklog(ctx, noopBackoffFunc) + + timeLimit := time.NewTimer(5 * time.Second) + ticker := time.NewTicker(1 * time.Second) + + // Once the listener is being served, and we try the job again it should succeed TestJobSucceeds: for { select { case <-ticker.C: - replJobs, err := ds.GetJobs(datastore.JobStateFailed|datastore.JobStateInProgress|datastore.JobStateReady, "backup", 10) + replJobs, err := ds.GetJobs(datastore.JobStateFailed|datastore.JobStateInProgress|datastore.JobStateReady|datastore.JobStateDead, "backup", 10) require.NoError(t, err) if len(replJobs) == 0 { //success break TestJobSucceeds } case <-timeLimit.C: - t.Error("time limit expired for job to complete") + t.Fatal("time limit expired for job to complete") } } } @@ -326,6 +425,7 @@ func TestBackoff(t *testing.T) { func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { server := serverPkg.NewInsecure(RubyServer) + serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() listener, err := net.Listen("unix", serverSocketPath) @@ -342,6 +442,26 @@ func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { return server, "unix://" + serverSocketPath } +// newReplicationService is a grpc service that has the Repository, Remote and ObjectPool services, which +// are the only ones needed for replication +func newReplicationService(tb testing.TB) (*grpc.Server, string) { + socketName := testhelper.GetTemporaryGitalySocketFileName() + + svr := testhelper.NewTestGrpcServer(tb, nil, nil) + + gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(&rubyserver.Server{})) + gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer()) + gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(&rubyserver.Server{})) + reflection.Register(svr) + + listener, err := net.Listen("unix", socketName) + require.NoError(tb, err) + + go svr.Serve(listener) + + return svr, socketName +} + func newRepositoryClient(t *testing.T, serverSocketPath string) (gitalypb.RepositoryServiceClient, *grpc.ClientConn) { connOpts := []grpc.DialOption{ grpc.WithInsecure(), diff --git a/internal/praefect/server.go b/internal/praefect/server.go index ceb1b8a76..863c30a3b 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -17,8 +17,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/service/server" "gitlab.com/gitlab-org/gitaly/internal/server/auth" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -31,11 +31,11 @@ import ( // Server is a praefect server type Server struct { - clientConnections *conn.ClientConnections - repl ReplMgr - s *grpc.Server - conf config.Config - l *logrus.Entry + nodeManager nodes.Manager + repl ReplMgr + s *grpc.Server + conf config.Config + l *logrus.Entry } func (srv *Server) warnDupeAddrs(c config.Config) { @@ -60,7 +60,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) { // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server { +func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, nodeManager nodes.Manager, conf config.Config) *Server { ctxTagOpts := []grpc_ctxtags.Option{ grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor), } @@ -98,11 +98,11 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo }...) s := &Server{ - s: grpc.NewServer(grpcOpts...), - repl: repl, - clientConnections: clientConnections, - conf: conf, - l: l, + s: grpc.NewServer(grpcOpts...), + repl: repl, + nodeManager: nodeManager, + conf: conf, + l: l, } s.warnDupeAddrs(conf) @@ -125,7 +125,7 @@ func (srv *Server) Serve(l net.Listener, secure bool) error { // RegisterServices will register any services praefect needs to handle rpcs on its own func (srv *Server) RegisterServices() { // ServerServiceServer is necessary for the ServerInfo RPC - gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections)) + gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.nodeManager)) healthpb.RegisterHealthServer(srv.s, health.NewServer()) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index e201bcbcb..23fb06f9c 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -16,11 +16,10 @@ import ( gconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/version" @@ -130,9 +129,11 @@ func TestGitalyServerInfoBadNode(t *testing.T) { }, } - clientCC := conn.NewClientConnections() - clientCC.RegisterNode(conf.VirtualStorages[0].Nodes[0].Storage, conf.VirtualStorages[0].Nodes[0].Address, conf.VirtualStorages[0].Nodes[0].Token) - _, srv := setupServer(t, conf, clientCC, log.Default(), protoregistry.GitalyProtoFileDescriptors) + entry := testhelper.DiscardTestEntry(t) + nodeMgr, err := nodes.NewManager(entry, conf) + require.NoError(t, err) + + _, srv := setupServer(t, conf, nodeMgr, entry, protoregistry.GitalyProtoFileDescriptors) listener, port := listenAvailPort(t) go func() { @@ -154,7 +155,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) { func TestGitalyDiskStatistics(t *testing.T) { conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ - &config.VirtualStorage{ + { Nodes: []*models.Node{ { Storage: "praefect-internal-1", @@ -168,6 +169,7 @@ func TestGitalyDiskStatistics(t *testing.T) { }, }, } + cc, _, cleanup := runPraefectServerWithGitaly(t, conf) defer cleanup() @@ -178,7 +180,7 @@ func TestGitalyDiskStatistics(t *testing.T) { metadata, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{}) require.NoError(t, err) - require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes)) + require.Len(t, metadata.GetStorageStatuses(), len(conf.VirtualStorages[0].Nodes)) for _, storageStatus := range metadata.GetStorageStatuses() { require.NotNil(t, storageStatus, "none of the storage statuses should be nil") diff --git a/internal/praefect/service/server/disk_stats.go b/internal/praefect/service/server/disk_stats.go index 745817826..700bdfb23 100644 --- a/internal/praefect/service/server/disk_stats.go +++ b/internal/praefect/service/server/disk_stats.go @@ -4,41 +4,38 @@ import ( "context" "fmt" - "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "golang.org/x/sync/errgroup" ) // DiskStatistics sends DiskStatisticsRequest to all of a praefect server's internal gitaly nodes and aggregates the // results into a response func (s *Server) DiskStatistics(ctx context.Context, _ *gitalypb.DiskStatisticsRequest) (*gitalypb.DiskStatisticsResponse, error) { - storageStatuses := make([][]*gitalypb.DiskStatisticsResponse_StorageStatus, len(s.conf.Nodes)) + var storageStatuses [][]*gitalypb.DiskStatisticsResponse_StorageStatus - g, ctx := errgroup.WithContext(ctx) + for _, virtualStorage := range s.conf.VirtualStorages { + shard, err := s.nodeMgr.GetShard(virtualStorage.Name) + if err != nil { + return nil, err + } - for i, node := range s.conf.Nodes { - i := i // necessary since it will be used in a goroutine below - node := node - cc, err := s.clientCC.GetConnection(node.Storage) + primary, err := shard.GetPrimary() + if err != nil { + return nil, err + } + secondaries, err := shard.GetSecondaries() if err != nil { - return nil, helper.ErrInternalf("error getting client connection for %s: %v", node.Storage, err) + return nil, err } - g.Go(func() error { - client := gitalypb.NewServerServiceClient(cc) + for _, node := range append(secondaries, primary) { + client := gitalypb.NewServerServiceClient(node.GetConnection()) resp, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{}) if err != nil { - return fmt.Errorf("error when requesting disk statistics from internal storage %v", node.Storage) + return nil, fmt.Errorf("error when requesting disk statistics from internal storage %v", node.GetStorage()) } - storageStatuses[i] = resp.GetStorageStatuses() - - return nil - }) - } - - if err := g.Wait(); err != nil { - return nil, helper.ErrInternal(err) + storageStatuses = append(storageStatuses, resp.GetStorageStatuses()) + } } var response gitalypb.DiskStatisticsResponse diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go index f960a9ea5..64360f171 100644 --- a/internal/praefect/service/server/info.go +++ b/internal/praefect/service/server/info.go @@ -6,7 +6,7 @@ import ( grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "golang.org/x/sync/errgroup" ) @@ -15,20 +15,26 @@ import ( // a response func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) { var once sync.Once - nodesChecked := make(map[string]struct{}) - var nodes []*models.Node + var nodes []nodes.Node for _, virtualStorage := range s.conf.VirtualStorages { - for _, node := range virtualStorage.Nodes { - if _, ok := nodesChecked[node.Storage]; ok { - continue - } + shard, err := s.nodeMgr.GetShard(virtualStorage.Name) + if err != nil { + return nil, err + } - nodesChecked[node.Storage] = struct{}{} - nodes = append(nodes, node) + primary, err := shard.GetPrimary() + if err != nil { + return nil, err } - } + secondaries, err := shard.GetSecondaries() + if err != nil { + return nil, err + } + + nodes = append(append(nodes, primary), secondaries...) + } var gitVersion, serverVersion string g, ctx := errgroup.WithContext(ctx) @@ -38,26 +44,20 @@ func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) for i, node := range nodes { i := i node := node - cc, err := s.clientCC.GetConnection(node.Storage) - if err != nil { - grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting client connection") - continue - } + g.Go(func() error { - client := gitalypb.NewServerServiceClient(cc) + client := gitalypb.NewServerServiceClient(node.GetConnection()) resp, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) if err != nil { - grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting sever info") + grpc_logrus.Extract(ctx).WithField("storage", node.GetStorage()).WithError(err).Error("error getting sever info") return nil } storageStatuses[i] = resp.GetStorageStatuses() - if node.DefaultPrimary { - once.Do(func() { - gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion() - }) - } + once.Do(func() { + gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion() + }) return nil }) diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go index 30e04f529..1788307ed 100644 --- a/internal/praefect/service/server/server.go +++ b/internal/praefect/service/server/server.go @@ -2,21 +2,21 @@ package server import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) // Server is a ServerService server type Server struct { - clientCC *conn.ClientConnections - conf config.Config + nodeMgr nodes.Manager + conf config.Config } // NewServer creates a new instance of a grpc ServerServiceServer -func NewServer(conf config.Config, clientConnections *conn.ClientConnections) gitalypb.ServerServiceServer { +func NewServer(conf config.Config, nodeMgr nodes.Manager) gitalypb.ServerServiceServer { s := &Server{ - clientCC: clientConnections, - conf: conf, + nodeMgr: nodeMgr, + conf: conf, } return s diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go index 9d0d23300..0199dff6f 100644 --- a/internal/service/repository/replicate.go +++ b/internal/service/repository/replicate.go @@ -221,9 +221,7 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR // newRemoteClient creates a new RemoteClient that talks to the same gitaly server func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) { - cc, err := s.getOrCreateConnection(map[string]string{ - "address": fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), - }) + cc, err := s.getOrCreateConnection(fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), "") if err != nil { return nil, err } @@ -247,11 +245,10 @@ func (s *server) getConnectionByStorage(ctx context.Context, storageName string) return nil, err } - return s.getOrCreateConnection(gitalyServerInfo) + return s.getOrCreateConnection(gitalyServerInfo["address"], gitalyServerInfo["token"]) } -func (s *server) getOrCreateConnection(server map[string]string) (*grpc.ClientConn, error) { - address, token := server["address"], server["token"] +func (s *server) getOrCreateConnection(address, token string) (*grpc.ClientConn, error) { if address == "" { return nil, errors.New("address is empty") } diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go index d113a9da4..6ef9b0939 100644 --- a/internal/service/repository/server_test.go +++ b/internal/service/repository/server_test.go @@ -48,7 +48,7 @@ func TestGetConnectionsConcurrentAccess(t *testing.T) { go func() { var err error - cc, err = s.getOrCreateConnection(map[string]string{"address": address}) + cc, err = s.getOrCreateConnection(address, "") require.NoError(t, err) wg.Done() }() diff --git a/internal/testhelper/test_hook.go b/internal/testhelper/test_hook.go index 7da7baffe..8d0b0264c 100644 --- a/internal/testhelper/test_hook.go +++ b/internal/testhelper/test_hook.go @@ -17,3 +17,8 @@ func DiscardTestLogger(tb testing.TB) *log.Logger { return logger } + +// DiscardTestLogger created a logrus entry that discards everything. +func DiscardTestEntry(tb testing.TB) *log.Entry { + return log.NewEntry(DiscardTestLogger(tb)) +} diff --git a/internal/testhelper/testserver.go b/internal/testhelper/testserver.go index 352a47194..7cc7a7ffb 100644 --- a/internal/testhelper/testserver.go +++ b/internal/testhelper/testserver.go @@ -27,6 +27,8 @@ import ( praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1" "gopkg.in/yaml.v2" ) @@ -351,3 +353,17 @@ type HTTPSettings struct { User string `yaml:"user"` Password string `yaml:"password"` } + +func NewServerWithHealth(t testing.TB, socketName string) (*grpc.Server, *health.Server) { + srv := NewTestGrpcServer(t, nil, nil) + healthSrvr := health.NewServer() + grpc_health_v1.RegisterHealthServer(srv, healthSrvr) + healthSrvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) + + lis, err := net.Listen("unix", socketName) + require.NoError(t, err) + + go srv.Serve(lis) + + return srv, healthSrvr +} |