Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-02-05 00:37:50 +0300
committerJohn Cai <jcai@gitlab.com>2020-02-14 00:18:11 +0300
commite154cba9d68f11da8c721b4997dd4e6eff532a73 (patch)
tree5c3b2525421e5e3355732d7240a047acd11e97f7
parente97d8fd8a43b5d826eb0ee180a405fe207acd2be (diff)
Use node manager
hooks up node manager to praefect's coordinator
-rw-r--r--changelogs/unreleased/jc-use-node-manager-ff.yml5
-rw-r--r--cmd/praefect/main.go27
-rw-r--r--internal/praefect/auth_test.go16
-rw-r--r--internal/praefect/conn/client_connections.go69
-rw-r--r--internal/praefect/conn/client_connections_test.go26
-rw-r--r--internal/praefect/coordinator.go159
-rw-r--r--internal/praefect/coordinator_test.go13
-rw-r--r--internal/praefect/datastore/datastore.go10
-rw-r--r--internal/praefect/datastore/datastore_test.go2
-rw-r--r--internal/praefect/helper_test.go36
-rw-r--r--internal/praefect/nodes/manager.go (renamed from internal/praefect/node_manager.go)89
-rw-r--r--internal/praefect/nodes/manager_test.go (renamed from internal/praefect/node_manager_test.go)72
-rw-r--r--internal/praefect/replicator.go124
-rw-r--r--internal/praefect/replicator_test.go212
-rw-r--r--internal/praefect/server.go26
-rw-r--r--internal/praefect/server_test.go16
-rw-r--r--internal/praefect/service/server/disk_stats.go37
-rw-r--r--internal/praefect/service/server/info.go44
-rw-r--r--internal/praefect/service/server/server.go12
-rw-r--r--internal/service/repository/replicate.go9
-rw-r--r--internal/service/repository/server_test.go2
-rw-r--r--internal/testhelper/test_hook.go5
-rw-r--r--internal/testhelper/testserver.go16
23 files changed, 517 insertions, 510 deletions
diff --git a/changelogs/unreleased/jc-use-node-manager-ff.yml b/changelogs/unreleased/jc-use-node-manager-ff.yml
new file mode 100644
index 000000000..69cdee0a3
--- /dev/null
+++ b/changelogs/unreleased/jc-use-node-manager-ff.yml
@@ -0,0 +1,5 @@
+---
+title: Wire up coordinator to use node manager
+merge_request: 1806
+author:
+type: changed
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index e74e09c05..31b219b70 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -42,9 +42,9 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/labkit/monitoring"
@@ -129,20 +129,11 @@ func configure() (config.Config, error) {
}
func run(cfgs []starter.Config, conf config.Config) error {
- clientConnections := conn.NewClientConnections()
-
- for _, virtualStorage := range conf.VirtualStorages {
- for _, node := range virtualStorage.Nodes {
- if _, err := clientConnections.GetConnection(node.Storage); err == nil {
- continue
- }
- if err := clientConnections.RegisterNode(node.Storage, node.Address, node.Token); err != nil {
- return fmt.Errorf("failed to register %s: %s", node.Address, err)
- }
-
- logger.WithField("node_address", node.Address).Info("registered gitaly node")
- }
+ nodeManager, err := nodes.NewManager(logger, conf)
+ if err != nil {
+ return err
}
+ nodeManager.Start(1*time.Second, 3*time.Second)
latencyMetric, err := metrics.RegisterReplicationLatency(conf.Prometheus)
if err != nil {
@@ -157,15 +148,15 @@ func run(cfgs []starter.Config, conf config.Config) error {
var (
// top level server dependencies
ds = datastore.NewInMemory(conf)
- coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
+ coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr(
"default",
logger,
ds,
- clientConnections,
+ nodeManager,
praefect.WithLatencyMetric(latencyMetric),
praefect.WithQueueMetric(queueMetric))
- srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
+ srv = praefect.NewServer(coordinator, repl, nil, logger, nodeManager, conf)
serverErrors = make(chan error, 1)
)
@@ -192,7 +183,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
go func() { serverErrors <- b.Wait() }()
go func() {
- serverErrors <- repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Minute))
+ serverErrors <- repl.ProcessBacklog(ctx, praefect.ExpBackoffFunc(1*time.Second, 5*time.Second))
}()
go coordinator.FailoverRotation()
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index f9130bfd1..6654fee19 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -10,12 +10,11 @@ import (
"github.com/stretchr/testify/require"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
"gitlab.com/gitlab-org/gitaly/internal/config/auth"
- "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"google.golang.org/grpc"
@@ -175,6 +174,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
Storage: "praefect-internal-0",
DefaultPrimary: true,
Address: backend,
+ Token: backendToken,
},
},
},
@@ -187,17 +187,17 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
panic(err)
}
- logEntry := log.Default()
+ logEntry := testhelper.DiscardTestEntry(t)
ds := datastore.NewInMemory(conf)
- clientConnections := conn.NewClientConnections()
- clientConnections.RegisterNode("praefect-internal-0", backend, backendToken)
+ nodeMgr, err := nodes.NewManager(logEntry, conf)
+ require.NoError(t, err)
- coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd)
+ coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, fd)
- replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections)
+ replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, nodeMgr)
- srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf)
+ srv := NewServer(coordinator, replMgr, nil, logEntry, nodeMgr, conf)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
diff --git a/internal/praefect/conn/client_connections.go b/internal/praefect/conn/client_connections.go
deleted file mode 100644
index d2e36f352..000000000
--- a/internal/praefect/conn/client_connections.go
+++ /dev/null
@@ -1,69 +0,0 @@
-package conn
-
-import (
- "errors"
- "sync"
-
- gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
- "gitlab.com/gitlab-org/gitaly/client"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- "google.golang.org/grpc"
-)
-
-// ErrConnectionNotFound indicates the connection for a given storage has not yet been registered
-var ErrConnectionNotFound = errors.New("client connection not found")
-
-// ErrAlreadyRegistered indicates the client connection for a given storage has already been registered
-var ErrAlreadyRegistered = errors.New("client connection already registered")
-
-// ClientConnections contains ready to use grpc client connections
-type ClientConnections struct {
- connMutex sync.RWMutex
- nodes map[string]*grpc.ClientConn
-}
-
-// NewClientConnections creates a new ClientConnections struct
-func NewClientConnections() *ClientConnections {
- return &ClientConnections{
- nodes: make(map[string]*grpc.ClientConn),
- }
-}
-
-// RegisterNode will direct traffic to the supplied downstream connection when the storage location
-// is encountered.
-func (c *ClientConnections) RegisterNode(storageName, listenAddr, token string) error {
- conn, err := client.Dial(listenAddr,
- []grpc.DialOption{
- grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(token)),
- },
- )
- if err != nil {
- return err
- }
-
- return c.setConn(storageName, conn)
-}
-
-func (c *ClientConnections) setConn(storageName string, conn *grpc.ClientConn) error {
- c.connMutex.Lock()
- if _, ok := c.nodes[storageName]; ok {
- return ErrAlreadyRegistered
- }
- c.nodes[storageName] = conn
- c.connMutex.Unlock()
-
- return nil
-}
-
-// GetConnection gets the grpc client connection based on an address
-func (c *ClientConnections) GetConnection(storageName string) (*grpc.ClientConn, error) {
- c.connMutex.RLock()
- cc, ok := c.nodes[storageName]
- c.connMutex.RUnlock()
- if !ok {
- return nil, ErrConnectionNotFound
- }
-
- return cc, nil
-}
diff --git a/internal/praefect/conn/client_connections_test.go b/internal/praefect/conn/client_connections_test.go
deleted file mode 100644
index 2c5f77a54..000000000
--- a/internal/praefect/conn/client_connections_test.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package conn
-
-import (
- "fmt"
- "testing"
-
- "github.com/stretchr/testify/require"
-)
-
-func TestRegisterNode(t *testing.T) {
- storageName := "default"
- tcpAddress := "address1"
- clientConn := NewClientConnections()
-
- _, err := clientConn.GetConnection(storageName)
- require.Equal(t, ErrConnectionNotFound, err)
-
- require.NoError(t, clientConn.RegisterNode(storageName, fmt.Sprintf("tcp://%s", tcpAddress), "token"))
-
- conn, err := clientConn.GetConnection(storageName)
- require.NoError(t, err)
- require.Equal(t, tcpAddress, conn.Target())
-
- err = clientConn.RegisterNode(storageName, "tcp://some-other-address", "token")
- require.Equal(t, ErrAlreadyRegistered, err)
-}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 3ed5ad57c..45b0dd841 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -2,8 +2,6 @@ package praefect
import (
"context"
- "errors"
- "fmt"
"os"
"os/signal"
"sync"
@@ -13,10 +11,9 @@ import (
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc/codes"
@@ -31,7 +28,7 @@ func isDestructive(methodName string) bool {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- connections *conn.ClientConnections
+ nodeMgr nodes.Manager
log *logrus.Entry
failoverMutex sync.RWMutex
@@ -42,16 +39,16 @@ type Coordinator struct {
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, nodeMgr nodes.Manager, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
return &Coordinator{
- log: l,
- datastore: ds,
- registry: registry,
- connections: clientConnections,
- conf: conf,
+ log: l,
+ datastore: ds,
+ registry: registry,
+ nodeMgr: nodeMgr,
+ conf: conf,
}
}
@@ -60,127 +57,121 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
-// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
- // For phase 1, we need to route messages based on the storage location
- // to the appropriate Gitaly node.
- c.log.Debugf("Stream director received method %s", fullMethodName)
-
- c.failoverMutex.RLock()
- defer c.failoverMutex.RUnlock()
+func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi protoregistry.MethodInfo, peeker proxy.StreamModifier, fullMethodName string, m proto.Message) (*proxy.StreamParameters, error) {
+ targetRepo, err := mi.TargetRepo(m)
+ if err != nil {
+ if err == protoregistry.ErrTargetRepoMissing {
+ return nil, status.Errorf(codes.InvalidArgument, err.Error())
+ }
+ return nil, err
+ }
- mi, err := c.registry.LookupMethod(fullMethodName)
+ shard, err := c.nodeMgr.GetShard(targetRepo.GetStorageName())
if err != nil {
return nil, err
}
- m, err := protoMessageFromPeeker(mi, peeker)
+ primary, err := shard.GetPrimary()
if err != nil {
return nil, err
}
- var requestFinalizer func()
- var storage string
+ if err = c.rewriteStorageForRepositoryMessage(mi, m, peeker, primary.GetStorage()); err != nil {
+ if err == protoregistry.ErrTargetRepoMissing {
+ return nil, status.Errorf(codes.InvalidArgument, err.Error())
+ }
- if mi.Scope == protoregistry.ScopeRepository {
- var getRepoErr error
- storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
+ return nil, err
+ }
- if getRepoErr == protoregistry.ErrTargetRepoMissing {
- return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error())
- }
+ var requestFinalizer func()
- if getRepoErr != nil {
- return nil, getRepoErr
+ if mi.Operation == protoregistry.OpMutator {
+ change := datastore.UpdateRepo
+ if isDestructive(fullMethodName) {
+ change = datastore.DeleteRepo
}
- if storage == "" {
- return nil, status.Error(codes.InvalidArgument, "storage not found")
- }
- } else {
- storage, requestFinalizer, err = c.getAnyStorageNode()
+ secondaries, err := shard.GetSecondaries()
if err != nil {
return nil, err
}
- }
- // We only need the primary node, as there's only one primary storage
- // location per praefect at this time
- cc, err := c.connections.GetConnection(storage)
- if err != nil {
- return nil, fmt.Errorf("unable to find existing client connection for %s", storage)
+
+ if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
+ return nil, err
+ }
}
- return proxy.NewStreamParameters(ctx, cc, requestFinalizer, nil), nil
+ return proxy.NewStreamParameters(ctx, primary.GetConnection(), requestFinalizer, nil), nil
}
-var noopRequestFinalizer = func() {}
+// streamDirector determines which downstream servers receive requests
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
+ // For phase 1, we need to route messages based on the storage location
+ // to the appropriate Gitaly node.
+ c.log.Debugf("Stream director received method %s", fullMethodName)
+
+ c.failoverMutex.RLock()
+ defer c.failoverMutex.RUnlock()
-func (c *Coordinator) getAnyStorageNode() (string, func(), error) {
- //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
- // proxy requests that are not repository scoped
- node, err := c.datastore.GetStorageNodes()
+ mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
- return "", nil, err
+ return nil, err
}
- if len(node) == 0 {
- return "", nil, errors.New("no node storages found")
+
+ m, err := protoMessageFromPeeker(mi, peeker)
+ if err != nil {
+ return nil, err
}
- return node[0].Storage, noopRequestFinalizer, nil
-}
+ if mi.Scope == protoregistry.ScopeRepository {
+ return c.directRepositoryScopedMessage(ctx, mi, peeker, fullMethodName, m)
+ }
-func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, method string) (string, func(), error) {
- targetRepo, err := mi.TargetRepo(m)
+ // TODO: remove the need to handle non repository scoped RPCs. The only remaining one is FindRemoteRepository.
+ // https://gitlab.com/gitlab-org/gitaly/issues/2442. One this issue is resolved, we can explicitly require that
+ // any RPC that gets proxied through praefect must be repository scoped.
+ shard, err := c.nodeMgr.GetShard(c.conf.VirtualStorages[0].Name)
if err != nil {
- return "", nil, err
+ return nil, err
}
- primary, err := c.datastore.GetPrimary(targetRepo.GetStorageName())
+ primary, err := shard.GetPrimary()
if err != nil {
- return "", nil, err
+ return nil, err
}
- secondaries, err := c.datastore.GetSecondaries(targetRepo.GetStorageName())
+ return proxy.NewStreamParameters(ctx, primary.GetConnection(), func() {}, nil), nil
+}
+
+func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, primaryStorage string) error {
+ targetRepo, err := mi.TargetRepo(m)
if err != nil {
- return "", nil, err
+ return err
}
// rewrite storage name
- targetRepo.StorageName = primary.Storage
+ targetRepo.StorageName = primaryStorage
additionalRepo, ok, err := mi.AdditionalRepo(m)
if err != nil {
- return "", nil, err
+ return err
}
if ok {
- additionalRepo.StorageName = primary.Storage
+ additionalRepo.StorageName = primaryStorage
}
b, err := proxy.Codec().Marshal(m)
if err != nil {
- return "", nil, err
+ return err
}
if err = peeker.Modify(b); err != nil {
- return "", nil, err
- }
-
- requestFinalizer := noopRequestFinalizer
-
- // TODO: move the logic of creating replication jobs to the streamDirector method
- if mi.Operation == protoregistry.OpMutator {
- change := datastore.UpdateRepo
- if isDestructive(method) {
- change = datastore.DeleteRepo
- }
-
- if requestFinalizer, err = c.createReplicaJobs(targetRepo, primary, secondaries, change); err != nil {
- return "", nil, err
- }
+ return err
}
- return primary.Storage, requestFinalizer, nil
+ return nil
}
func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModifier) (proto.Message, error) {
@@ -197,8 +188,12 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary models.Node, secondaries []models.Node, change datastore.ChangeType) (func(), error) {
- jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary, secondaries, change)
+func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, primary nodes.Node, secondaries []nodes.Node, change datastore.ChangeType) (func(), error) {
+ var secondaryStorages []string
+ for _, secondary := range secondaries {
+ secondaryStorages = append(secondaryStorages, secondary.GetStorage())
+ }
+ jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 4283779a1..dccab79f0 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1,18 +1,16 @@
package praefect
import (
- "fmt"
"io/ioutil"
"testing"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -57,10 +55,13 @@ func TestStreamDirector(t *testing.T) {
defer cancel()
address := "gitaly-primary.example.com"
- clientConnections := conn.NewClientConnections()
- clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token")
- coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf)
+ entry := testhelper.DiscardTestEntry(t)
+
+ nodeMgr, err := nodes.NewManager(entry, conf)
+ require.NoError(t, err)
+
+ coordinator := NewCoordinator(entry, ds, nodeMgr, conf)
require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index bc677aa33..5896e7797 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -108,7 +108,7 @@ type ReplJobsDatastore interface {
// CreateReplicaReplJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error)
// UpdateReplJobState updates the state of an existing replication job
UpdateReplJobState(jobID uint64, newState JobState) error
@@ -302,7 +302,7 @@ var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditi
// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary models.Node, secondaries []models.Node, change ChangeType) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primaryStorage string, secondaryStorages []string, change ChangeType) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
@@ -312,15 +312,15 @@ func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string, primary mo
var jobIDs []uint64
- for _, secondary := range secondaries {
+ for _, secondaryStorage := range secondaryStorages {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.records[nextID] = jobRecord{
change: change,
- targetNodeStorage: secondary.Storage,
+ targetNodeStorage: secondaryStorage,
state: JobStatePending,
relativePath: relativePath,
- sourceNodeStorage: primary.Storage,
+ sourceNodeStorage: primaryStorage,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 81e03e2cf..0ee189123 100644
--- a/internal/praefect/datastore/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -44,7 +44,7 @@ var operations = []struct {
{
desc: "insert replication job",
opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1, []models.Node{stor2}, UpdateRepo)
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo)
require.NoError(t, err)
},
},
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 953bb88fe..16094ac6c 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -14,11 +14,11 @@ import (
internalauth "gitlab.com/gitlab-org/gitaly/internal/config/auth"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
@@ -28,6 +28,8 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
func waitUntil(t *testing.T, ch <-chan struct{}, timeout time.Duration) {
@@ -70,10 +72,10 @@ func testConfig(backends int) config.Config {
// setupServer wires all praefect dependencies together via dependency
// injection
-func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) {
+func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) {
var (
ds = datastore.NewInMemory(conf)
- coordinator = NewCoordinator(l, ds, clientCC, conf, fds...)
+ coordinator = NewCoordinator(l, ds, nodeMgr, conf, fds...)
)
var defaultNode *models.Node
@@ -88,14 +90,14 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
defaultNode.Storage,
l,
ds,
- clientCC,
+ nodeMgr,
)
server := NewServer(
coordinator,
replmgr,
nil,
l,
- clientCC,
+ nodeMgr,
conf,
)
@@ -108,8 +110,6 @@ func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnecti
// configured storage node.
// requires there to be only 1 virtual storage
func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[string]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) {
- clientCC := conn.NewClientConnections()
-
require.Len(t, conf.VirtualStorages, 1)
require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes),
"mock server count doesn't match config nodes")
@@ -123,12 +123,15 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
backendAddr, cleanup := newMockDownstream(t, node.Token, backend)
cleanups = append(cleanups, cleanup)
- clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
node.Address = backendAddr
conf.VirtualStorages[0].Nodes[i] = node
}
- _, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf)
+ require.NoError(t, err)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
+
+ _, prf := setupServer(t, conf, nodeMgr, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
listener, port := listenAvailPort(t)
t.Logf("praefect listening on port %d", port)
@@ -169,14 +172,12 @@ func noopBackoffFunc() (backoff, backoffReset) {
// requires exactly 1 virtual storage
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
require.Len(t, conf.VirtualStorages, 1)
- clientCC := conn.NewClientConnections()
var cleanups []testhelper.Cleanup
for i, node := range conf.VirtualStorages[0].Nodes {
_, backendAddr, cleanup := runInternalGitalyServer(t, node.Token)
cleanups = append(cleanups, cleanup)
- clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
node.Address = backendAddr
conf.VirtualStorages[0].Nodes[i] = node
}
@@ -184,13 +185,17 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
ds := datastore.NewInMemory(conf)
logEntry := log.Default()
- coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf)
+ require.NoError(t, err)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
+
+ coordinator := NewCoordinator(logEntry, ds, nodeMgr, conf, protoregistry.GitalyProtoFileDescriptors...)
replmgr := NewReplMgr(
- "",
+ conf.VirtualStorages[0].Name,
logEntry,
ds,
- clientCC,
+ nodeMgr,
WithQueueMetric(&promtest.MockGauge{}),
WithLatencyMetric(&promtest.MockHistogram{}),
)
@@ -199,7 +204,7 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
replmgr,
nil,
logEntry,
- clientCC,
+ nodeMgr,
conf,
)
@@ -249,6 +254,7 @@ func runInternalGitalyServer(t *testing.T, token string) (*grpc.Server, string,
gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer())
gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(rubyServer))
+ healthpb.RegisterHealthServer(server, health.NewServer())
errQ := make(chan error)
diff --git a/internal/praefect/node_manager.go b/internal/praefect/nodes/manager.go
index 50880e89f..3b090372d 100644
--- a/internal/praefect/node_manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -1,7 +1,6 @@
-package praefect
+package nodes
import (
- "bytes"
"context"
"errors"
"sync"
@@ -23,8 +22,8 @@ type Shard interface {
GetSecondaries() ([]Node, error)
}
-// NodeManager is responsible for returning shards for virtual storages
-type NodeManager interface {
+// Manager is responsible for returning shards for virtual storages
+type Manager interface {
GetShard(virtualStorageName string) (Shard, error)
}
@@ -63,11 +62,13 @@ func (s *shard) GetSecondaries() ([]Node, error) {
return secondaries, nil
}
-// NodeMgr is a concrete type that adheres to the NodeManager interface
-type NodeMgr struct {
- shards map[string]*shard
- log *logrus.Entry
+// Mgr is a concrete type that adheres to the Manager interface
+type Mgr struct {
+ shards map[string]*shard
+ // staticShards never changes based on node health. It is a static set of shards that comes from the config's
+ // VirtualStorages
failoverEnabled bool
+ log *logrus.Entry
}
// ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence
@@ -75,23 +76,23 @@ type NodeMgr struct {
var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
// NewNodeManager creates a new NodeMgr based on virtual storage configs
-func NewNodeManager(log *logrus.Entry, c config.Config) (*NodeMgr, error) {
+func NewManager(log *logrus.Entry, c config.Config, dialOpts ...grpc.DialOption) (*Mgr, error) {
shards := make(map[string]*shard)
-
for _, virtualStorage := range c.VirtualStorages {
var secondaries []*nodeStatus
var primary *nodeStatus
for _, node := range virtualStorage.Nodes {
conn, err := client.Dial(node.Address,
- []grpc.DialOption{
- grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
- grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)),
- },
+ append(
+ []grpc.DialOption{
+ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(node.Token)),
+ }, dialOpts...),
)
if err != nil {
return nil, err
}
- ns := newConnectionStatus(*node, conn)
+ ns := newConnectionStatus(*node, conn, log)
if node.DefaultPrimary {
primary = ns
@@ -107,7 +108,7 @@ func NewNodeManager(log *logrus.Entry, c config.Config) (*NodeMgr, error) {
}
}
- return &NodeMgr{
+ return &Mgr{
shards: shards,
log: log,
failoverEnabled: c.FailoverEnabled,
@@ -118,36 +119,32 @@ func NewNodeManager(log *logrus.Entry, c config.Config) (*NodeMgr, error) {
// for deeming a node "healthy"
const healthcheckThreshold = 3
-func (n *NodeMgr) bootstrap(d time.Duration) error {
+func (n *Mgr) bootstrap(d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
for i := 0; i < healthcheckThreshold; i++ {
<-timer.C
- if err := n.checkShards(); err != nil {
- return err
- }
+ n.checkShards()
timer.Reset(d)
}
return nil
}
-func (n *NodeMgr) monitor(d time.Duration) {
+func (n *Mgr) monitor(d time.Duration) {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
<-ticker.C
- if err := n.checkShards(); err != nil {
- n.log.WithError(err).Error("error when checking shards")
- }
+ n.checkShards()
}
}
// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off
// the monitoring process. Start must be called before NodeMgr can be used.
-func (n *NodeMgr) Start(bootstrapInterval, monitorInterval time.Duration) {
+func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) {
if n.failoverEnabled {
n.bootstrap(bootstrapInterval)
go n.monitor(monitorInterval)
@@ -155,7 +152,7 @@ func (n *NodeMgr) Start(bootstrapInterval, monitorInterval time.Duration) {
}
// GetShard retrieves a shard for a virtual storage name
-func (n *NodeMgr) GetShard(virtualStorageName string) (Shard, error) {
+func (n *Mgr) GetShard(virtualStorageName string) (Shard, error) {
shard, ok := n.shards[virtualStorageName]
if !ok {
return nil, errors.New("virtual storage does not exist")
@@ -170,28 +167,11 @@ func (n *NodeMgr) GetShard(virtualStorageName string) (Shard, error) {
return shard, nil
}
-type errCollection []error
-
-func (e errCollection) Error() string {
- sb := bytes.NewBufferString("")
- for _, err := range e {
- sb.WriteString(err.Error())
- sb.WriteString("\n")
- }
-
- return sb.String()
-}
-
-func (n *NodeMgr) checkShards() error {
- var errs errCollection
+func (n *Mgr) checkShards() {
for _, shard := range n.shards {
- if err := shard.primary.check(); err != nil {
- errs = append(errs, err)
- }
+ shard.primary.check()
for _, secondary := range shard.secondaries {
- if err := secondary.check(); err != nil {
- errs = append(errs, err)
- }
+ secondary.check()
}
if shard.primary.isHealthy() {
@@ -217,19 +197,14 @@ func (n *NodeMgr) checkShards() error {
shard.primary = newPrimary
shard.m.Unlock()
}
-
- if len(errs) > 0 {
- return errs
- }
-
- return nil
}
-func newConnectionStatus(node models.Node, cc *grpc.ClientConn) *nodeStatus {
+func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry) *nodeStatus {
return &nodeStatus{
Node: node,
ClientConn: cc,
statuses: make([]healthpb.HealthCheckResponse_ServingStatus, 0),
+ log: l,
}
}
@@ -237,6 +212,7 @@ type nodeStatus struct {
models.Node
*grpc.ClientConn
statuses []healthpb.HealthCheckResponse_ServingStatus
+ log *logrus.Entry
}
// GetStorage gets the storage name of a node
@@ -273,13 +249,14 @@ func (n *nodeStatus) isHealthy() bool {
return true
}
-func (n *nodeStatus) check() error {
+func (n *nodeStatus) check() {
client := healthpb.NewHealthClient(n.ClientConn)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
- resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: "TestService"})
+ resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
if err != nil {
+ n.log.WithError(err).WithField("address", n.Address).Warn("error when pinging healthcheck")
resp = &healthpb.HealthCheckResponse{
Status: healthpb.HealthCheckResponse_UNKNOWN,
}
@@ -289,6 +266,4 @@ func (n *nodeStatus) check() error {
if len(n.statuses) > healthcheckThreshold {
n.statuses = n.statuses[1:]
}
-
- return err
}
diff --git a/internal/praefect/node_manager_test.go b/internal/praefect/nodes/manager_test.go
index 45682a8d4..69a24c10c 100644
--- a/internal/praefect/node_manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -1,42 +1,51 @@
-package praefect
+package nodes
import (
- "net"
"testing"
"time"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"google.golang.org/grpc"
- "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
func TestNodeStatus(t *testing.T) {
- cc, healthSvr, cleanup := newHealthServer(t, testhelper.GetTemporaryGitalySocketFileName())
- defer cleanup()
+ socket := testhelper.GetTemporaryGitalySocketFileName()
+ svr, healthSvr := testhelper.NewServerWithHealth(t, socket)
+ defer svr.Stop()
- cs := newConnectionStatus(models.Node{}, cc)
+ cc, err := grpc.Dial(
+ "unix://"+socket,
+ grpc.WithInsecure(),
+ )
+
+ require.NoError(t, err)
+
+ cs := newConnectionStatus(models.Node{}, cc, testhelper.DiscardTestEntry(t))
require.False(t, cs.isHealthy())
for i := 0; i < healthcheckThreshold; i++ {
- require.NoError(t, cs.check())
+ cs.check()
}
require.True(t, cs.isHealthy())
- healthSvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
+ healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
- require.NoError(t, cs.check())
+ cs.check()
require.False(t, cs.isHealthy())
}
func TestNodeManager(t *testing.T) {
- internalSocket0 := testhelper.GetTemporaryGitalySocketFileName()
- internalSocket1 := testhelper.GetTemporaryGitalySocketFileName()
+ internalSocket0, internalSocket1 := testhelper.GetTemporaryGitalySocketFileName(), testhelper.GetTemporaryGitalySocketFileName()
+ srv0, healthSrv0 := testhelper.NewServerWithHealth(t, internalSocket0)
+ defer srv0.Stop()
+
+ srv1, healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1)
+ defer srv1.Stop()
virtualStorages := []*config.VirtualStorage{
{
@@ -64,16 +73,10 @@ func TestNodeManager(t *testing.T) {
FailoverEnabled: false,
}
- _, srv0, cancel0 := newHealthServer(t, internalSocket0)
- defer cancel0()
-
- _, _, cancel1 := newHealthServer(t, internalSocket1)
- defer cancel1()
-
- nm, err := NewNodeManager(log.Default(), confWithFailover)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover)
require.NoError(t, err)
- nmWithoutFailover, err := NewNodeManager(log.Default(), confWithoutFailover)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover)
require.NoError(t, err)
nm.Start(1*time.Millisecond, 5*time.Second)
@@ -109,7 +112,7 @@ func TestNodeManager(t *testing.T) {
require.Equal(t, virtualStorages[0].Nodes[1].Storage, secondaries[0].GetStorage())
require.Equal(t, virtualStorages[0].Nodes[1].Address, secondaries[0].GetAddress())
- srv0.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_UNKNOWN)
+ healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
nm.checkShards()
// since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy
@@ -149,34 +152,9 @@ func TestNodeManager(t *testing.T) {
require.Equal(t, virtualStorages[0].Nodes[0].Storage, secondaries[0].GetStorage())
require.Equal(t, virtualStorages[0].Nodes[0].Address, secondaries[0].GetAddress())
- cancel1()
+ healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
nm.checkShards()
_, err = nm.GetShard("virtual-storage-0")
require.Error(t, err, "should return error since no nodes are healthy")
}
-
-func newHealthServer(t testing.TB, socketName string) (*grpc.ClientConn, *health.Server, func()) {
- srv := testhelper.NewTestGrpcServer(t, nil, nil)
- healthSrvr := health.NewServer()
- grpc_health_v1.RegisterHealthServer(srv, healthSrvr)
- healthSrvr.SetServingStatus("TestService", grpc_health_v1.HealthCheckResponse_SERVING)
-
- lis, err := net.Listen("unix", socketName)
- require.NoError(t, err)
-
- go srv.Serve(lis)
-
- cleanup := func() {
- srv.Stop()
- }
-
- cc, err := grpc.Dial(
- "unix://"+socketName,
- grpc.WithInsecure(),
- )
-
- require.NoError(t, err)
-
- return cc, healthSrvr, cleanup
-}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index f0e7abc5f..49af0be3d 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -8,9 +8,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@@ -148,12 +148,12 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
type ReplMgr struct {
log *logrus.Entry
datastore datastore.Datastore
- clientConnections *conn.ClientConnections
- targetNode string // which replica is this replicator responsible for?
+ nodeManager nodes.Manager
+ virtualStorage string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
replQueueMetric metrics.Gauge
replLatencyMetric metrics.Histogram
-
+ replJobTimeout time.Duration
// whitelist contains the project names of the repos we wish to replicate
whitelist map[string]struct{}
}
@@ -177,14 +177,14 @@ func WithLatencyMetric(h metrics.Histogram) func(*ReplMgr) {
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(virtualStorage string, log *logrus.Entry, datastore datastore.Datastore, nodeMgr nodes.Manager, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
datastore: datastore,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
- targetNode: targetNode,
- clientConnections: c,
+ virtualStorage: virtualStorage,
+ nodeManager: nodeMgr,
replLatencyMetric: prometheus.NewHistogram(prometheus.HistogramOpts{}),
replQueueMetric: prometheus.NewGauge(prometheus.GaugeOpts{}),
}
@@ -248,65 +248,80 @@ const (
maxAttempts = 3
)
+func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []nodes.Node, err error) {
+ shard, err := r.nodeManager.GetShard(r.virtualStorage)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ primary, err = shard.GetPrimary()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ secondaries, err = shard.GetSecondaries()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return primary, secondaries, nil
+}
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
backoff, reset := b()
for {
- nodes, err := r.datastore.GetStorageNodes()
- if err != nil {
- r.log.WithError(err).Error("error when getting storage nodes")
- return err
- }
-
var totalJobs int
- for _, node := range nodes {
- jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, node.Storage, 10)
- if err != nil {
- r.log.WithField("storage", node.Storage).WithError(err).Error("error when retrieving jobs for replication")
- continue
- }
+ primary, secondaries, err := r.getPrimaryAndSecondaries()
+ if err == nil {
+ for _, secondary := range secondaries {
+ jobs, err := r.datastore.GetJobs(datastore.JobStateReady|datastore.JobStateFailed, secondary.GetStorage(), 10)
+ if err != nil {
+ return err
+ }
- totalJobs += len(jobs)
+ totalJobs += len(jobs)
- reposReplicated := make(map[string]struct{})
- for _, job := range jobs {
- if job.Attempts >= maxAttempts {
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ reposReplicated := make(map[string]struct{})
+ for _, job := range jobs {
+ if job.Attempts >= maxAttempts {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ }
+ continue
}
- continue
- }
- if _, ok := reposReplicated[job.RelativePath]; ok {
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ if _, ok := reposReplicated[job.RelativePath]; ok {
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to cancelled")
+ }
+ continue
}
- continue
- }
- if err := r.processReplJob(ctx, job); err != nil {
- r.log.WithFields(logrus.Fields{
- logWithReplJobID: job.ID,
- "from_storage": job.SourceNode.Storage,
- "to_storage": job.TargetNode.Storage,
- }).WithError(err).Error("replication job failed")
-
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to failed")
+ if err = r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil {
+ r.log.WithFields(logrus.Fields{
+ logWithReplJobID: job.ID,
+ "from_storage": job.SourceNode.Storage,
+ "to_storage": job.TargetNode.Storage,
+ }).WithError(err).Error("replication job failed")
+ if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil {
+ r.log.WithError(err).Error("error when updating replication job status to failed")
+ }
continue
}
- }
- reposReplicated[job.RelativePath] = struct{}{}
+ reposReplicated[job.RelativePath] = struct{}{}
+ }
}
+ } else {
+ r.log.WithError(err).WithField("virtual_storage", r.virtualStorage).Error("error when getting primary and secondaries")
}
if totalJobs == 0 {
select {
case <-time.After(backoff()):
continue
-
case <-ctx.Done():
return ctx.Err()
}
@@ -320,7 +335,8 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
// is a crutch in this situation. Ideally, we need to update state somewhere
// with information regarding the replication failure. See follow up issue:
// https://gitlab.com/gitlab-org/gitaly/issues/2138
-func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) error {
+
+func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
l := r.log.
WithField(logWithReplJobID, job.ID).
WithField(logWithReplSource, job.SourceNode).
@@ -336,19 +352,17 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) erro
return err
}
- targetCC, err := r.clientConnections.GetConnection(job.TargetNode.Storage)
- if err != nil {
- l.WithError(err).Error("unable to obtain client connection for secondary node in replication job")
- return err
- }
+ var replCtx context.Context
+ var cancel func()
- sourceCC, err := r.clientConnections.GetConnection(job.SourceNode.Storage)
- if err != nil {
- l.WithError(err).Error("unable to obtain client connection for primary node in replication job")
- return err
+ if r.replJobTimeout > 0 {
+ replCtx, cancel = context.WithTimeout(ctx, r.replJobTimeout)
+ } else {
+ replCtx, cancel = context.WithCancel(ctx)
}
+ defer cancel()
- injectedCtx, err := helper.InjectGitalyServers(ctx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token)
+ injectedCtx, err := helper.InjectGitalyServers(replCtx, job.SourceNode.Storage, job.SourceNode.Address, job.SourceNode.Token)
if err != nil {
l.WithError(err).Error("unable to inject Gitaly servers into context for replication job")
return err
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 30bf4ae08..24b10b051 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -13,18 +13,21 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git/objectpool"
- gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
+ objectpoolservice "gitlab.com/gitlab-org/gitaly/internal/service/objectpool"
+ "gitlab.com/gitlab-org/gitaly/internal/service/remote"
+ "gitlab.com/gitlab-org/gitaly/internal/service/repository"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/reflection"
)
func TestProcessReplicationJob(t *testing.T) {
@@ -115,7 +118,11 @@ func TestProcessReplicationJob(t *testing.T) {
secondaries, err := ds.GetSecondaries(config.VirtualStorages[0].Name)
require.NoError(t, err)
- _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, secondaries, datastore.UpdateRepo)
+ var secondaryStorages []string
+ for _, secondary := range secondaries {
+ secondaryStorages = append(secondaryStorages, secondary.Storage)
+ }
+ _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo)
require.NoError(t, err)
jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, backupStorageName, 1)
@@ -127,19 +134,28 @@ func TestProcessReplicationJob(t *testing.T) {
})
var replicator defaultReplicator
- replicator.log = gitaly_log.Default()
+ entry := testhelper.DiscardTestEntry(t)
+ replicator.log = entry
- clientCC := conn.NewClientConnections()
- require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token))
- require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token))
+ nodeMgr, err := nodes.NewManager(entry, config)
+ require.NoError(t, err)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
var mockReplicationGauge promtest.MockGauge
var mockReplicationHistogram promtest.MockHistogram
- replMgr := NewReplMgr("", gitaly_log.Default(), ds, clientCC, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge))
+ replMgr := NewReplMgr("", testhelper.DiscardTestEntry(t), ds, nodeMgr, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge))
replMgr.replicator = replicator
- replMgr.processReplJob(ctx, jobs[0])
+ shard, err := nodeMgr.GetShard(config.VirtualStorages[0].Name)
+ require.NoError(t, err)
+ primaryNode, err := shard.GetPrimary()
+ require.NoError(t, err)
+ secondaryNodes, err := shard.GetSecondaries()
+ require.NoError(t, err)
+ require.Len(t, secondaryNodes, 1)
+
+ replMgr.processReplJob(ctx, jobs[0], primaryNode.GetConnection(), secondaryNodes[0].GetConnection())
relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath)
require.NoError(t, err)
@@ -175,7 +191,8 @@ func TestConfirmReplication(t *testing.T) {
require.NoError(t, err)
var replicator defaultReplicator
- replicator.log = gitaly_log.Default()
+ entry := testhelper.DiscardTestEntry(t)
+ replicator.log = entry
equal, err := replicator.confirmChecksums(ctx, gitalypb.NewRepositoryServiceClient(conn), gitalypb.NewRepositoryServiceClient(conn), testRepoA, testRepoB)
require.NoError(t, err)
@@ -190,9 +207,16 @@ func TestConfirmReplication(t *testing.T) {
require.False(t, equal)
}
-func TestProcessBacklog(t *testing.T) {
- srv, srvSocketPath := runFullGitalyServer(t)
- defer srv.Stop()
+func TestProcessBacklog_FailedJobs(t *testing.T) {
+ primarySvr, primarySocket := newReplicationService(t)
+ defer primarySvr.Stop()
+
+ backupSvr, backupSocket := newReplicationService(t)
+ backupSvr.Stop()
+
+ internalListener, err := net.Listen("unix", gitaly_config.GitalyInternalSocketPath())
+ require.NoError(t, err)
+ go backupSvr.Serve(internalListener)
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -204,32 +228,15 @@ func TestProcessBacklog(t *testing.T) {
defer os.RemoveAll(backupDir)
- oldStorages := gitaly_config.Config.Storages
- defer func() {
- gitaly_config.Config.Storages = oldStorages
- }()
-
- gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
- Name: backupStorageName,
- Path: backupDir,
- },
- gitaly_config.Storage{
- Name: "default",
- Path: testhelper.GitlabTestStoragePath(),
- },
- )
-
primary := models.Node{
Storage: "default",
- Address: srvSocketPath,
- Token: gitaly_config.Config.Auth.Token,
+ Address: "unix://" + primarySocket,
DefaultPrimary: true,
}
secondary := models.Node{
Storage: backupStorageName,
- Address: srvSocketPath,
- Token: gitaly_config.Config.Auth.Token,
+ Address: "unix://" + backupSocket,
}
config := config.Config{
@@ -244,24 +251,41 @@ func TestProcessBacklog(t *testing.T) {
},
}
+ ctx, cancel := testhelper.Context()
+ defer func(oldStorages []gitaly_config.Storage) {
+ gitaly_config.Config.Storages = oldStorages
+ cancel()
+ }(gitaly_config.Config.Storages)
+
+ gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
+ Name: backupStorageName,
+ Path: backupDir,
+ },
+ gitaly_config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ },
+ )
+
ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, []models.Node{secondary}, datastore.UpdateRepo)
+ ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
require.NoError(t, err)
require.Len(t, ids, 1)
+ entry := testhelper.DiscardTestEntry(t)
+
require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady))
- clientCC := conn.NewClientConnections()
- require.NoError(t, clientCC.RegisterNode("default", srvSocketPath, gitaly_config.Config.Auth.Token))
+ nodeMgr, err := nodes.NewManager(entry, config)
+ require.NoError(t, err)
- replMgr := NewReplMgr(backupStorageName, gitaly_log.Default(), ds, clientCC)
- ctx, cancel := testhelper.Context()
- defer cancel()
+ replMgr := NewReplMgr("default", entry, ds, nodeMgr)
+ replMgr.replJobTimeout = 100 * time.Millisecond
go replMgr.ProcessBacklog(ctx, noopBackoffFunc)
timeLimit := time.NewTimer(5 * time.Second)
- ticker := time.NewTicker(10 * time.Millisecond)
+ ticker := time.NewTicker(1 * time.Second)
// the job will fail to process because the client connection for "backup" is not registered. It should fail maxAttempts times
// and get cancelled.
@@ -273,33 +297,108 @@ TestJobGetsCancelled:
require.NoError(t, err)
if len(replJobs) == 1 {
//success
+ timeLimit.Stop()
break TestJobGetsCancelled
}
case <-timeLimit.C:
- t.Fatal("time limit expired for job to complete")
+ t.Fatal("time limit expired for job to be deemed dead")
}
}
+}
+
+func TestProcessBacklog_Success(t *testing.T) {
+ primarySvr, primarySocket := newReplicationService(t)
+ defer primarySvr.Stop()
+
+ backupSvr, backupSocket := newReplicationService(t)
+ defer backupSvr.Stop()
+
+ internalListener, err := net.Listen("unix", gitaly_config.GitalyInternalSocketPath())
+ require.NoError(t, err)
+ go backupSvr.Serve(internalListener)
+
+ testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ backupStorageName := "backup"
+
+ backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName)
+ require.NoError(t, err)
+
+ defer os.RemoveAll(backupDir)
+
+ primary := models.Node{
+ Storage: "default",
+ Address: "unix://" + primarySocket,
+ DefaultPrimary: true,
+ }
- require.NoError(t, clientCC.RegisterNode("backup", srvSocketPath, gitaly_config.Config.Auth.Token))
- ids, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary, []models.Node{secondary}, datastore.UpdateRepo)
+ secondary := models.Node{
+ Storage: backupStorageName,
+ Address: "unix://" + backupSocket,
+ }
+
+ config := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ &primary,
+ &secondary,
+ },
+ },
+ },
+ }
+
+ ctx, cancel := testhelper.Context()
+ defer func(oldStorages []gitaly_config.Storage) {
+ gitaly_config.Config.Storages = oldStorages
+ cancel()
+ }(gitaly_config.Config.Storages)
+
+ gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
+ Name: backupStorageName,
+ Path: backupDir,
+ },
+ gitaly_config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ },
+ )
+
+ ds := datastore.NewInMemory(config)
+ ids, err := ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo)
require.NoError(t, err)
require.Len(t, ids, 1)
+
+ entry := testhelper.DiscardTestEntry(t)
+
require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady))
- timeLimit.Reset(5 * time.Second)
- // Once the node is registered, and we try the job again it should succeed
+ nodeMgr, err := nodes.NewManager(entry, config)
+ require.NoError(t, err)
+
+ replMgr := NewReplMgr("default", entry, ds, nodeMgr)
+ replMgr.replJobTimeout = 5 * time.Second
+
+ go replMgr.ProcessBacklog(ctx, noopBackoffFunc)
+
+ timeLimit := time.NewTimer(5 * time.Second)
+ ticker := time.NewTicker(1 * time.Second)
+
+ // Once the listener is being served, and we try the job again it should succeed
TestJobSucceeds:
for {
select {
case <-ticker.C:
- replJobs, err := ds.GetJobs(datastore.JobStateFailed|datastore.JobStateInProgress|datastore.JobStateReady, "backup", 10)
+ replJobs, err := ds.GetJobs(datastore.JobStateFailed|datastore.JobStateInProgress|datastore.JobStateReady|datastore.JobStateDead, "backup", 10)
require.NoError(t, err)
if len(replJobs) == 0 {
//success
break TestJobSucceeds
}
case <-timeLimit.C:
- t.Error("time limit expired for job to complete")
+ t.Fatal("time limit expired for job to complete")
}
}
}
@@ -326,6 +425,7 @@ func TestBackoff(t *testing.T) {
func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
server := serverPkg.NewInsecure(RubyServer)
+
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
listener, err := net.Listen("unix", serverSocketPath)
@@ -342,6 +442,26 @@ func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
return server, "unix://" + serverSocketPath
}
+// newReplicationService is a grpc service that has the Repository, Remote and ObjectPool services, which
+// are the only ones needed for replication
+func newReplicationService(tb testing.TB) (*grpc.Server, string) {
+ socketName := testhelper.GetTemporaryGitalySocketFileName()
+
+ svr := testhelper.NewTestGrpcServer(tb, nil, nil)
+
+ gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(&rubyserver.Server{}))
+ gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer())
+ gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(&rubyserver.Server{}))
+ reflection.Register(svr)
+
+ listener, err := net.Listen("unix", socketName)
+ require.NoError(tb, err)
+
+ go svr.Serve(listener)
+
+ return svr, socketName
+}
+
func newRepositoryClient(t *testing.T, serverSocketPath string) (gitalypb.RepositoryServiceClient, *grpc.ClientConn) {
connOpts := []grpc.DialOption{
grpc.WithInsecure(),
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index ceb1b8a76..863c30a3b 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -17,8 +17,8 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/sentryhandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/service/server"
"gitlab.com/gitlab-org/gitaly/internal/server/auth"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -31,11 +31,11 @@ import (
// Server is a praefect server
type Server struct {
- clientConnections *conn.ClientConnections
- repl ReplMgr
- s *grpc.Server
- conf config.Config
- l *logrus.Entry
+ nodeManager nodes.Manager
+ repl ReplMgr
+ s *grpc.Server
+ conf config.Config
+ l *logrus.Entry
}
func (srv *Server) warnDupeAddrs(c config.Config) {
@@ -60,7 +60,7 @@ func (srv *Server) warnDupeAddrs(c config.Config) {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, clientConnections *conn.ClientConnections, conf config.Config) *Server {
+func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry, nodeManager nodes.Manager, conf config.Config) *Server {
ctxTagOpts := []grpc_ctxtags.Option{
grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor),
}
@@ -98,11 +98,11 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
}...)
s := &Server{
- s: grpc.NewServer(grpcOpts...),
- repl: repl,
- clientConnections: clientConnections,
- conf: conf,
- l: l,
+ s: grpc.NewServer(grpcOpts...),
+ repl: repl,
+ nodeManager: nodeManager,
+ conf: conf,
+ l: l,
}
s.warnDupeAddrs(conf)
@@ -125,7 +125,7 @@ func (srv *Server) Serve(l net.Listener, secure bool) error {
// RegisterServices will register any services praefect needs to handle rpcs on its own
func (srv *Server) RegisterServices() {
// ServerServiceServer is necessary for the ServerInfo RPC
- gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.clientConnections))
+ gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(srv.conf, srv.nodeManager))
healthpb.RegisterHealthServer(srv.s, health.NewServer())
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index e201bcbcb..23fb06f9c 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -16,11 +16,10 @@ import (
gconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/version"
@@ -130,9 +129,11 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
},
}
- clientCC := conn.NewClientConnections()
- clientCC.RegisterNode(conf.VirtualStorages[0].Nodes[0].Storage, conf.VirtualStorages[0].Nodes[0].Address, conf.VirtualStorages[0].Nodes[0].Token)
- _, srv := setupServer(t, conf, clientCC, log.Default(), protoregistry.GitalyProtoFileDescriptors)
+ entry := testhelper.DiscardTestEntry(t)
+ nodeMgr, err := nodes.NewManager(entry, conf)
+ require.NoError(t, err)
+
+ _, srv := setupServer(t, conf, nodeMgr, entry, protoregistry.GitalyProtoFileDescriptors)
listener, port := listenAvailPort(t)
go func() {
@@ -154,7 +155,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
func TestGitalyDiskStatistics(t *testing.T) {
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
- &config.VirtualStorage{
+ {
Nodes: []*models.Node{
{
Storage: "praefect-internal-1",
@@ -168,6 +169,7 @@ func TestGitalyDiskStatistics(t *testing.T) {
},
},
}
+
cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
defer cleanup()
@@ -178,7 +180,7 @@ func TestGitalyDiskStatistics(t *testing.T) {
metadata, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{})
require.NoError(t, err)
- require.Len(t, metadata.GetStorageStatuses(), len(conf.Nodes))
+ require.Len(t, metadata.GetStorageStatuses(), len(conf.VirtualStorages[0].Nodes))
for _, storageStatus := range metadata.GetStorageStatuses() {
require.NotNil(t, storageStatus, "none of the storage statuses should be nil")
diff --git a/internal/praefect/service/server/disk_stats.go b/internal/praefect/service/server/disk_stats.go
index 745817826..700bdfb23 100644
--- a/internal/praefect/service/server/disk_stats.go
+++ b/internal/praefect/service/server/disk_stats.go
@@ -4,41 +4,38 @@ import (
"context"
"fmt"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "golang.org/x/sync/errgroup"
)
// DiskStatistics sends DiskStatisticsRequest to all of a praefect server's internal gitaly nodes and aggregates the
// results into a response
func (s *Server) DiskStatistics(ctx context.Context, _ *gitalypb.DiskStatisticsRequest) (*gitalypb.DiskStatisticsResponse, error) {
- storageStatuses := make([][]*gitalypb.DiskStatisticsResponse_StorageStatus, len(s.conf.Nodes))
+ var storageStatuses [][]*gitalypb.DiskStatisticsResponse_StorageStatus
- g, ctx := errgroup.WithContext(ctx)
+ for _, virtualStorage := range s.conf.VirtualStorages {
+ shard, err := s.nodeMgr.GetShard(virtualStorage.Name)
+ if err != nil {
+ return nil, err
+ }
- for i, node := range s.conf.Nodes {
- i := i // necessary since it will be used in a goroutine below
- node := node
- cc, err := s.clientCC.GetConnection(node.Storage)
+ primary, err := shard.GetPrimary()
+ if err != nil {
+ return nil, err
+ }
+ secondaries, err := shard.GetSecondaries()
if err != nil {
- return nil, helper.ErrInternalf("error getting client connection for %s: %v", node.Storage, err)
+ return nil, err
}
- g.Go(func() error {
- client := gitalypb.NewServerServiceClient(cc)
+ for _, node := range append(secondaries, primary) {
+ client := gitalypb.NewServerServiceClient(node.GetConnection())
resp, err := client.DiskStatistics(ctx, &gitalypb.DiskStatisticsRequest{})
if err != nil {
- return fmt.Errorf("error when requesting disk statistics from internal storage %v", node.Storage)
+ return nil, fmt.Errorf("error when requesting disk statistics from internal storage %v", node.GetStorage())
}
- storageStatuses[i] = resp.GetStorageStatuses()
-
- return nil
- })
- }
-
- if err := g.Wait(); err != nil {
- return nil, helper.ErrInternal(err)
+ storageStatuses = append(storageStatuses, resp.GetStorageStatuses())
+ }
}
var response gitalypb.DiskStatisticsResponse
diff --git a/internal/praefect/service/server/info.go b/internal/praefect/service/server/info.go
index f960a9ea5..64360f171 100644
--- a/internal/praefect/service/server/info.go
+++ b/internal/praefect/service/server/info.go
@@ -6,7 +6,7 @@ import (
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"golang.org/x/sync/errgroup"
)
@@ -15,20 +15,26 @@ import (
// a response
func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) {
var once sync.Once
- nodesChecked := make(map[string]struct{})
- var nodes []*models.Node
+ var nodes []nodes.Node
for _, virtualStorage := range s.conf.VirtualStorages {
- for _, node := range virtualStorage.Nodes {
- if _, ok := nodesChecked[node.Storage]; ok {
- continue
- }
+ shard, err := s.nodeMgr.GetShard(virtualStorage.Name)
+ if err != nil {
+ return nil, err
+ }
- nodesChecked[node.Storage] = struct{}{}
- nodes = append(nodes, node)
+ primary, err := shard.GetPrimary()
+ if err != nil {
+ return nil, err
}
- }
+ secondaries, err := shard.GetSecondaries()
+ if err != nil {
+ return nil, err
+ }
+
+ nodes = append(append(nodes, primary), secondaries...)
+ }
var gitVersion, serverVersion string
g, ctx := errgroup.WithContext(ctx)
@@ -38,26 +44,20 @@ func (s *Server) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest)
for i, node := range nodes {
i := i
node := node
- cc, err := s.clientCC.GetConnection(node.Storage)
- if err != nil {
- grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting client connection")
- continue
- }
+
g.Go(func() error {
- client := gitalypb.NewServerServiceClient(cc)
+ client := gitalypb.NewServerServiceClient(node.GetConnection())
resp, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{})
if err != nil {
- grpc_logrus.Extract(ctx).WithField("storage", node.Storage).WithError(err).Error("error getting sever info")
+ grpc_logrus.Extract(ctx).WithField("storage", node.GetStorage()).WithError(err).Error("error getting sever info")
return nil
}
storageStatuses[i] = resp.GetStorageStatuses()
- if node.DefaultPrimary {
- once.Do(func() {
- gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion()
- })
- }
+ once.Do(func() {
+ gitVersion, serverVersion = resp.GetGitVersion(), resp.GetServerVersion()
+ })
return nil
})
diff --git a/internal/praefect/service/server/server.go b/internal/praefect/service/server/server.go
index 30e04f529..1788307ed 100644
--- a/internal/praefect/service/server/server.go
+++ b/internal/praefect/service/server/server.go
@@ -2,21 +2,21 @@ package server
import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
// Server is a ServerService server
type Server struct {
- clientCC *conn.ClientConnections
- conf config.Config
+ nodeMgr nodes.Manager
+ conf config.Config
}
// NewServer creates a new instance of a grpc ServerServiceServer
-func NewServer(conf config.Config, clientConnections *conn.ClientConnections) gitalypb.ServerServiceServer {
+func NewServer(conf config.Config, nodeMgr nodes.Manager) gitalypb.ServerServiceServer {
s := &Server{
- clientCC: clientConnections,
- conf: conf,
+ nodeMgr: nodeMgr,
+ conf: conf,
}
return s
diff --git a/internal/service/repository/replicate.go b/internal/service/repository/replicate.go
index 9d0d23300..0199dff6f 100644
--- a/internal/service/repository/replicate.go
+++ b/internal/service/repository/replicate.go
@@ -221,9 +221,7 @@ func (s *server) syncInfoAttributes(ctx context.Context, in *gitalypb.ReplicateR
// newRemoteClient creates a new RemoteClient that talks to the same gitaly server
func (s *server) newRemoteClient() (gitalypb.RemoteServiceClient, error) {
- cc, err := s.getOrCreateConnection(map[string]string{
- "address": fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()),
- })
+ cc, err := s.getOrCreateConnection(fmt.Sprintf("unix:%s", config.GitalyInternalSocketPath()), "")
if err != nil {
return nil, err
}
@@ -247,11 +245,10 @@ func (s *server) getConnectionByStorage(ctx context.Context, storageName string)
return nil, err
}
- return s.getOrCreateConnection(gitalyServerInfo)
+ return s.getOrCreateConnection(gitalyServerInfo["address"], gitalyServerInfo["token"])
}
-func (s *server) getOrCreateConnection(server map[string]string) (*grpc.ClientConn, error) {
- address, token := server["address"], server["token"]
+func (s *server) getOrCreateConnection(address, token string) (*grpc.ClientConn, error) {
if address == "" {
return nil, errors.New("address is empty")
}
diff --git a/internal/service/repository/server_test.go b/internal/service/repository/server_test.go
index d113a9da4..6ef9b0939 100644
--- a/internal/service/repository/server_test.go
+++ b/internal/service/repository/server_test.go
@@ -48,7 +48,7 @@ func TestGetConnectionsConcurrentAccess(t *testing.T) {
go func() {
var err error
- cc, err = s.getOrCreateConnection(map[string]string{"address": address})
+ cc, err = s.getOrCreateConnection(address, "")
require.NoError(t, err)
wg.Done()
}()
diff --git a/internal/testhelper/test_hook.go b/internal/testhelper/test_hook.go
index 7da7baffe..8d0b0264c 100644
--- a/internal/testhelper/test_hook.go
+++ b/internal/testhelper/test_hook.go
@@ -17,3 +17,8 @@ func DiscardTestLogger(tb testing.TB) *log.Logger {
return logger
}
+
+// DiscardTestLogger created a logrus entry that discards everything.
+func DiscardTestEntry(tb testing.TB) *log.Entry {
+ return log.NewEntry(DiscardTestLogger(tb))
+}
diff --git a/internal/testhelper/testserver.go b/internal/testhelper/testserver.go
index 352a47194..7cc7a7ffb 100644
--- a/internal/testhelper/testserver.go
+++ b/internal/testhelper/testserver.go
@@ -27,6 +27,8 @@ import (
praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v2"
)
@@ -351,3 +353,17 @@ type HTTPSettings struct {
User string `yaml:"user"`
Password string `yaml:"password"`
}
+
+func NewServerWithHealth(t testing.TB, socketName string) (*grpc.Server, *health.Server) {
+ srv := NewTestGrpcServer(t, nil, nil)
+ healthSrvr := health.NewServer()
+ grpc_health_v1.RegisterHealthServer(srv, healthSrvr)
+ healthSrvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+
+ lis, err := net.Listen("unix", socketName)
+ require.NoError(t, err)
+
+ go srv.Serve(lis)
+
+ return srv, healthSrvr
+}