Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-05-28 10:38:53 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-05-28 10:59:29 +0300
commita9b2b302e49b351a4f3a552351177d56942c8c25 (patch)
treeb7a4239f7108fb74ce4e33677b9acd90a5b1dd84
parent97dcd53c020d3d6d123530ac1a41140b950dd607 (diff)
Praefect: removal of unnecessary Datastore wrapper
Datastore is not needed anymore because of introduction of nodes.NewManager. Now it is responsible for managing nodes. Also Queue interface removed as unnecessary and replaced with ReplicationEventQueue. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2613
-rw-r--r--cmd/praefect/main.go14
-rw-r--r--internal/praefect/auth_test.go9
-rw-r--r--internal/praefect/coordinator.go38
-rw-r--r--internal/praefect/coordinator_test.go58
-rw-r--r--internal/praefect/dataloss_check_test.go12
-rw-r--r--internal/praefect/datastore/datastore.go163
-rw-r--r--internal/praefect/helper_test.go45
-rw-r--r--internal/praefect/nodes/manager.go8
-rw-r--r--internal/praefect/nodes/manager_test.go66
-rw-r--r--internal/praefect/replicator.go4
-rw-r--r--internal/praefect/replicator_test.go84
-rw-r--r--internal/praefect/server.go4
-rw-r--r--internal/praefect/server_test.go17
-rw-r--r--internal/praefect/service/info/consistencycheck.go4
-rw-r--r--internal/praefect/service/info/server.go15
15 files changed, 148 insertions, 393 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index c9cac9416..c41ed9820 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -231,14 +231,14 @@ func run(cfgs []starter.Config, conf config.Config) error {
db = dbConn
}
- ds := datastore.Datastore{ReplicasDatastore: datastore.NewInMemory(conf)}
+ var queue datastore.ReplicationEventQueue
if conf.MemoryQueueEnabled {
- ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue(conf)
+ queue = datastore.NewMemoryReplicationEventQueue(conf)
} else {
- ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db)
+ queue = datastore.NewPostgresReplicationEventQueue(db)
}
- nodeManager, err := nodes.NewManager(logger, conf, db, ds, nodeLatencyHistogram)
+ nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram)
if err != nil {
return err
}
@@ -261,11 +261,11 @@ func run(cfgs []starter.Config, conf config.Config) error {
var (
// top level server dependencies
- coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator = praefect.NewCoordinator(queue, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
repl = praefect.NewReplMgr(
logger,
conf.VirtualStorageNames(),
- ds.ReplicationEventQueue,
+ queue,
nodeManager,
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
@@ -281,7 +281,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
return fmt.Errorf("unable to create a bootstrap: %v", err)
}
- srv.RegisterServices(nodeManager, transactionManager, conf, ds)
+ srv.RegisterServices(nodeManager, transactionManager, conf, queue)
b.StopAction = srv.GracefulStop
for _, cfg := range cfgs {
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 5146d10b2..cc5f35bbb 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -169,12 +169,9 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
}
logEntry := testhelper.DiscardTestEntry(t)
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
@@ -182,7 +179,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
registry, err := protoregistry.New(fd)
require.NoError(t, err)
- coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry)
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry)
srv := NewServer(coordinator.StreamDirector, logEntry, registry, conf)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b302cc54b..953918c41 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -7,6 +7,7 @@ import (
"sync"
"github.com/golang/protobuf/proto"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
@@ -79,23 +80,27 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- nodeMgr nodes.Manager
- txMgr *transactions.Manager
- log logrus.FieldLogger
- datastore datastore.Datastore
- registry *protoregistry.Registry
- conf config.Config
+ nodeMgr nodes.Manager
+ txMgr *transactions.Manager
+ queue datastore.ReplicationEventQueue
+ registry *protoregistry.Registry
+ conf config.Config
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l logrus.FieldLogger, ds datastore.Datastore, nodeMgr nodes.Manager, txMgr *transactions.Manager, conf config.Config, r *protoregistry.Registry) *Coordinator {
+func NewCoordinator(
+ queue datastore.ReplicationEventQueue,
+ nodeMgr nodes.Manager,
+ txMgr *transactions.Manager,
+ conf config.Config,
+ r *protoregistry.Registry,
+) *Coordinator {
return &Coordinator{
- log: l,
- datastore: ds,
- registry: r,
- nodeMgr: nodeMgr,
- txMgr: txMgr,
- conf: conf,
+ queue: queue,
+ registry: r,
+ nodeMgr: nodeMgr,
+ txMgr: txMgr,
+ conf: conf,
}
}
@@ -212,7 +217,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
- c.log.Debugf("Stream director received method %s", fullMethodName)
+ ctxlogrus.Extract(ctx).Debugf("Stream director received method %s", fullMethodName)
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
@@ -338,9 +343,9 @@ func (c *Coordinator) createReplicaJobs(
go func() {
defer wg.Done()
- _, err := c.datastore.Enqueue(ctx, event)
+ _, err := c.queue.Enqueue(ctx, event)
if err != nil {
- c.log.WithError(err).WithFields(logrus.Fields{
+ ctxlogrus.Extract(ctx).WithError(err).WithFields(logrus.Fields{
logWithReplVirtual: event.Job.VirtualStorage,
logWithReplSource: event.Job.SourceNodeStorage,
logWithReplTarget: event.Job.TargetNodeStorage,
@@ -360,7 +365,6 @@ func (c *Coordinator) ensureCorrelationID(ctx context.Context, targetRepo *gital
var err error
corrID, err = correlation.RandomID()
if err != nil {
- c.log.WithError(err).Error("unable to generate correlation ID")
corrID = generatePseudorandomCorrelationID(targetRepo)
}
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index d49d29a9a..f104e55b1 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -113,8 +113,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
const storageName = "test-storage"
coordinator := NewCoordinator(
- testhelper.DiscardTestEntry(t),
- datastore.Datastore{datastore.NewInMemory(conf), datastore.NewMemoryReplicationEventQueue(conf)},
+ datastore.NewMemoryReplicationEventQueue(conf),
&mockNodeManager{GetShardFunc: func(storage string) (nodes.Shard, error) {
return nodes.Shard{
IsReadOnly: tc.readOnly,
@@ -154,20 +153,13 @@ func TestStreamDirectorMutator(t *testing.T) {
healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
+ primaryNode := &models.Node{Address: primaryAddress, Storage: "praefect-internal-1", DefaultPrimary: true}
+ secondaryNode := &models.Node{Address: secondaryAddress, Storage: "praefect-internal-2"}
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
- Name: "praefect",
- Nodes: []*models.Node{
- &models.Node{
- Address: primaryAddress,
- Storage: "praefect-internal-1",
- DefaultPrimary: true,
- },
- &models.Node{
- Address: secondaryAddress,
- Storage: "praefect-internal-2",
- }},
+ Name: "praefect",
+ Nodes: []*models.Node{primaryNode, secondaryNode},
},
},
}
@@ -180,11 +172,6 @@ func TestStreamDirectorMutator(t *testing.T) {
return queue.Enqueue(ctx, event)
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
targetRepo := gitalypb.Repository{
StorageName: "praefect",
RelativePath: "/path/to/hashed/storage",
@@ -195,12 +182,12 @@ func TestStreamDirectorMutator(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
@@ -234,13 +221,8 @@ func TestStreamDirectorMutator(t *testing.T) {
// this call creates new events in the queue and simulates usual flow of the update operation
streamParams.RequestFinalizer()
- targetNode, err := ds.GetStorageNode("praefect-internal-2")
- require.NoError(t, err)
- sourceNode, err := ds.GetStorageNode("praefect-internal-1")
- require.NoError(t, err)
-
replEventWait.Wait() // wait until event persisted (async operation)
- events, err := ds.ReplicationEventQueue.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
+ events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, events, 1)
@@ -255,8 +237,8 @@ func TestStreamDirectorMutator(t *testing.T) {
Change: datastore.UpdateRepo,
VirtualStorage: conf.VirtualStorages[0].Name,
RelativePath: targetRepo.RelativePath,
- TargetNodeStorage: targetNode.Storage,
- SourceNodeStorage: sourceNode.Storage,
+ TargetNodeStorage: secondaryNode.Storage,
+ SourceNodeStorage: primaryNode.Storage,
},
Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
}
@@ -289,10 +271,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
},
}
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -305,12 +284,12 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
require.NoError(t, err)
@@ -388,11 +367,6 @@ func TestAbsentCorrelationID(t *testing.T) {
return queue.Enqueue(ctx, event)
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
targetRepo := gitalypb.Repository{
StorageName: "praefect",
RelativePath: "/path/to/hashed/storage",
@@ -403,11 +377,11 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
@@ -427,7 +401,7 @@ func TestAbsentCorrelationID(t *testing.T) {
streamParams.RequestFinalizer()
replEventWait.Wait() // wait until event persisted (async operation)
- jobs, err := coordinator.datastore.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1)
+ jobs, err := queueInterceptor.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go
index bc705b632..26daba4b6 100644
--- a/internal/praefect/dataloss_check_test.go
+++ b/internal/praefect/dataloss_check_test.go
@@ -106,15 +106,9 @@ func TestDatalossCheck(t *testing.T) {
require.NoError(t, err)
killJobs(t)
- cc, _, clean := runPraefectServerWithMock(t, cfg,
- datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(cfg),
- ReplicationEventQueue: rq,
- },
- map[string]mock.SimpleServiceServer{
- "not-needed": &mock.UnimplementedSimpleServiceServer{},
- },
- )
+ cc, _, clean := runPraefectServerWithMock(t, cfg, rq, map[string]mock.SimpleServiceServer{
+ "not-needed": &mock.UnimplementedSimpleServiceServer{},
+ })
defer clean()
pbFrom, err := ptypes.TimestampProto(beforeTimerange.CreatedAt)
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 9a4513f0b..c89d974d6 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -8,13 +8,7 @@ package datastore
import (
"database/sql/driver"
"encoding/json"
- "errors"
"fmt"
- "sync"
- "time"
-
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
// JobState is an enum that indicates the state of a job
@@ -89,160 +83,3 @@ func (p Params) Value() (driver.Value, error) {
}
return string(data), nil
}
-
-// ReplJob is an instance of a queued replication job. A replication job is
-// meant for updating the repository so that it is synced with the primary
-// copy. Scheduled indicates when a replication job should be performed.
-type ReplJob struct {
- Change ChangeType
- ID uint64 // autoincrement ID
- VirtualStorage string // virtual storage
- TargetNode, SourceNode models.Node // which node to replicate to?
- RelativePath string // source for replication
- State JobState
- Attempts int
- Params Params // additional information required to run the job
- CorrelationID string // from original request
- CreatedAt time.Time // when has the job been created?
-}
-
-// Datastore is a data persistence abstraction for all of Praefect's
-// persistence needs
-type Datastore struct {
- ReplicasDatastore
- ReplicationEventQueue
-}
-
-// ReplicasDatastore manages accessing and setting which secondary replicas
-// backup a repository
-type ReplicasDatastore interface {
- GetPrimary(virtualStorage string) (models.Node, error)
-
- GetSecondaries(virtualStorage string) ([]models.Node, error)
-
- GetReplicas(relativePath string) ([]models.Node, error)
-
- GetStorageNode(nodeStorage string) (models.Node, error)
-
- GetStorageNodes() ([]models.Node, error)
- // VirtualStorages returns a list of virtual storages that are configured for this instance.
- VirtualStorages() []string
-}
-
-// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
-// only intended for early beta requirements and as a reference implementation
-// for the eventual SQL implementation
-type MemoryDatastore struct {
- // storageNodes is read-only after initialization
- // if modification needed there must be synchronization for concurrent access to it
- storageNodes map[string]models.Node
-
- repositories *struct {
- sync.RWMutex
- m map[string]models.Repository
- }
-
- // virtualStorages is read-only after initialization
- // if modification needed there must be synchronization for concurrent access to it
- virtualStorages map[string][]*models.Node
-}
-
-// NewInMemory returns an initialized in-memory datastore
-func NewInMemory(cfg config.Config) *MemoryDatastore {
- m := &MemoryDatastore{
- storageNodes: map[string]models.Node{},
- repositories: &struct {
- sync.RWMutex
- m map[string]models.Repository
- }{
- m: map[string]models.Repository{},
- },
- virtualStorages: map[string][]*models.Node{},
- }
-
- for _, virtualStorage := range cfg.VirtualStorages {
- m.virtualStorages[virtualStorage.Name] = virtualStorage.Nodes
-
- for _, node := range virtualStorage.Nodes {
- // TODO: if there is two nodes with same storage name defined for different virtual storages
- // only one definition will be used: https://gitlab.com/gitlab-org/gitaly/-/issues/2613
- if _, ok := m.storageNodes[node.Storage]; ok {
- continue
- }
- m.storageNodes[node.Storage] = *node
- }
- }
-
- return m
-}
-
-// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it
-var ErrNoPrimaryForStorage = errors.New("no primary for storage")
-
-// GetPrimary returns the primary configured in the config file
-func (md *MemoryDatastore) GetPrimary(virtualStorage string) (models.Node, error) {
- for _, node := range md.virtualStorages[virtualStorage] {
- if node.DefaultPrimary {
- return *node, nil
- }
- }
-
- return models.Node{}, ErrNoPrimaryForStorage
-}
-
-// GetSecondaries gets the secondary nodes associated with a virtual storage
-func (md *MemoryDatastore) GetSecondaries(virtualStorage string) ([]models.Node, error) {
- var secondaries []models.Node
-
- for _, node := range md.virtualStorages[virtualStorage] {
- if !node.DefaultPrimary {
- secondaries = append(secondaries, *node)
- }
- }
-
- return secondaries, nil
-}
-
-// GetReplicas gets the secondaries for a repository based on the relative path
-func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, error) {
- md.repositories.RLock()
- defer md.repositories.RUnlock()
-
- repository, ok := md.repositories.m[relativePath]
- if !ok {
- return nil, errors.New("repository not found")
- }
-
- // to prevent possible modification of element of the slice
- copied := repository.Clone()
- return copied.Replicas, nil
-}
-
-// GetStorageNode gets all storage nodes
-func (md *MemoryDatastore) GetStorageNode(nodeStorage string) (models.Node, error) {
- node, ok := md.storageNodes[nodeStorage]
- if !ok {
- return models.Node{}, errors.New("node not found")
- }
-
- return node, nil
-}
-
-// GetStorageNodes gets all storage nodes
-func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
- var storageNodes []models.Node
- for _, storageNode := range md.storageNodes {
- storageNodes = append(storageNodes, storageNode)
- }
-
- return storageNodes, nil
-}
-
-// VirtualStorages returns list of virtual storages configured to be supported.
-func (md *MemoryDatastore) VirtualStorages() []string {
- vs := make([]string, 0, len(md.virtualStorages))
- for name := range md.virtualStorages {
- vs = append(vs, name)
- }
- return vs
-}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 54d195483..a8db569ce 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -90,12 +90,12 @@ func assertPrimariesExist(t testing.TB, conf config.Config) {
// config.Nodes. There must be a 1-to-1 mapping between backend server and
// configured storage node.
// requires there to be only 1 virtual storage
-func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithMock(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
r, err := protoregistry.New(mustLoadProtoReg(t))
require.NoError(t, err)
return runPraefectServer(t, conf, buildOptions{
- withDatastore: ds,
+ withQueue: queue,
withBackends: withMockBackends(t, backends),
withAnnotations: r,
})
@@ -119,7 +119,7 @@ func (nullNodeMgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPa
}
type buildOptions struct {
- withDatastore datastore.Datastore
+ withQueue datastore.ReplicationEventQueue
withTxMgr *transactions.Manager
withBackends func([]*config.VirtualStorage) []testhelper.Cleanup
withAnnotations *protoregistry.Registry
@@ -184,37 +184,29 @@ func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhel
}
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
-
- return runPraefectServerWithGitalyWithDatastore(t, conf, ds)
+ return runPraefectServerWithGitalyWithDatastore(t, conf, defaultQueue(conf))
}
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
// requires exactly 1 virtual storage
-func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, ds datastore.Datastore) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
return runPraefectServer(t, conf, buildOptions{
- withDatastore: ds,
- withTxMgr: transactions.NewManager(),
- withBackends: withRealGitalyShared(t),
+ withQueue: queue,
+ withTxMgr: transactions.NewManager(),
+ withBackends: withRealGitalyShared(t),
})
}
-func defaultDatastore(conf config.Config) datastore.Datastore {
- return datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+func defaultQueue(conf config.Config) datastore.ReplicationEventQueue {
+ return datastore.NewMemoryReplicationEventQueue(conf)
}
func defaultTxMgr() *transactions.Manager {
return transactions.NewManager()
}
-func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, promtest.NewMockHistogramVec())
+func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager {
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
return nodeMgr
@@ -225,8 +217,8 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
var cleanups []testhelper.Cleanup
- if opt.withDatastore == (datastore.Datastore{}) {
- opt.withDatastore = defaultDatastore(conf)
+ if opt.withQueue == nil {
+ opt.withQueue = defaultQueue(conf)
}
if opt.withTxMgr == nil {
opt.withTxMgr = defaultTxMgr()
@@ -241,12 +233,11 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withLogger = log.Default()
}
if opt.withNodeMgr == nil {
- opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withDatastore)
+ opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withQueue)
}
coordinator := NewCoordinator(
- opt.withLogger,
- opt.withDatastore,
+ opt.withQueue,
opt.withNodeMgr,
opt.withTxMgr,
conf,
@@ -257,7 +248,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
replmgr := NewReplMgr(
opt.withLogger,
conf.VirtualStorageNames(),
- opt.withDatastore,
+ opt.withQueue,
opt.withNodeMgr,
WithQueueMetric(&promtest.MockGauge{}),
)
@@ -269,7 +260,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
errQ := make(chan error)
ctx, cancel := testhelper.Context()
- prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withDatastore)
+ prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withQueue)
go func() { errQ <- prf.Serve(listener, false) }()
replmgr.ProcessBacklog(ctx, noopBackoffFunc)
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index d9d79ec2f..91f2c1d97 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -79,7 +79,7 @@ type Mgr struct {
// strategies is a map of strategies keyed on virtual storage name
strategies map[string]leaderElectionStrategy
db *sql.DB
- ds datastore.Datastore
+ queue datastore.ReplicationEventQueue
}
// leaderElectionStrategy defines the interface by which primary and
@@ -98,7 +98,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
@@ -148,7 +148,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Dat
db: db,
failoverEnabled: c.Failover.Enabled,
strategies: strategies,
- ds: ds,
+ queue: queue,
}, nil
}
@@ -206,7 +206,7 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
var storages []string
if featureflag.IsEnabled(ctx, featureflag.DistributedReads) {
- if storages, err = n.ds.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil {
+ if storages, err = n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil {
// this is recoverable error - proceed with primary node
ctxlogrus.Extract(ctx).
WithError(err).
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 047d1c4b6..e2b318a59 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -86,7 +86,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"},
VirtualStorages: []*config.VirtualStorage{virtualStorage},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec())
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nm.Start(time.Millisecond, time.Millisecond)
@@ -134,7 +134,7 @@ func TestPrimaryIsSecond(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, mockHistogram)
require.NoError(t, err)
shard, err := nm.GetShard("virtual-storage-0")
@@ -184,7 +184,7 @@ func TestBlockingDial(t *testing.T) {
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
}()
- mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec())
+ mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
mgr.Start(1*time.Millisecond, 1*time.Millisecond)
@@ -229,13 +229,11 @@ func TestNodeManager(t *testing.T) {
Failover: config.Failover{Enabled: false},
}
- ds := datastore.Datastore{}
-
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, ds, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, ds, mockHistogram)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram)
require.NoError(t, err)
nm.Start(1*time.Millisecond, 5*time.Second)
@@ -377,46 +375,46 @@ func TestMgr_GetSyncedNode(t *testing.T) {
ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.DistributedReads)
- ackEvent := func(ds datastore.Datastore, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent {
+ ackEvent := func(queue datastore.ReplicationEventQueue, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent {
event := datastore.ReplicationEvent{Job: job}
- eevent, err := ds.Enqueue(ctx, event)
+ eevent, err := queue.Enqueue(ctx, event)
require.NoError(t, err)
- devents, err := ds.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2)
+ devents, err := queue.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2)
require.NoError(t, err)
require.Len(t, devents, 1)
- acks, err := ds.Acknowledge(ctx, state, []uint64{devents[0].ID})
+ acks, err := queue.Acknowledge(ctx, state, []uint64{devents[0].ID})
require.NoError(t, err)
require.Equal(t, []uint64{devents[0].ID}, acks)
return devents[0]
}
- verify := func(scenario func(t *testing.T, nm Manager, ds datastore.Datastore)) func(*testing.T) {
- ds := datastore.Datastore{ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf)}
+ verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) {
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram)
require.NoError(t, err)
nm.Start(time.Duration(0), time.Hour)
- return func(t *testing.T) { scenario(t, nm, ds) }
+ return func(t *testing.T) { scenario(t, nm, queue) }
}
- t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
_, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "")
require.True(t, errors.Is(err, ErrVirtualStorageNotExist))
}))
- t.Run("no replication events", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ t.Run("no replication events", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "no/matter")
require.NoError(t, err)
require.Contains(t, []string{vs0Primary, "unix://" + sockets[1]}, node.GetAddress())
}))
- t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- _, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
@@ -431,8 +429,8 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vs0Event, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vs0Event, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
@@ -442,7 +440,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
})
require.NoError(t, err)
- vs0Events, err := ds.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500)
+ vs0Events, err := queue.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500)
require.NoError(t, err)
require.Len(t, vs0Events, 1)
@@ -451,8 +449,8 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vs0Event := ackEvent(ds, datastore.ReplicationJob{
+ t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vs0Event := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -464,15 +462,15 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vsEvent0 := ackEvent(ds, datastore.ReplicationJob{
+ t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vsEvent0 := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
VirtualStorage: "virtual-storage-0",
}, datastore.JobStateCompleted)
- vsEvent1, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ vsEvent1, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RelativePath: vsEvent0.Job.RelativePath,
TargetNodeStorage: vsEvent0.Job.TargetNodeStorage,
@@ -487,15 +485,15 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vs0Event := ackEvent(ds, datastore.ReplicationJob{
+ t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vs0Event := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
VirtualStorage: "virtual-storage-0",
}, datastore.JobStateDead)
- ackEvent(ds, datastore.ReplicationJob{
+ ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "gitaly-1",
@@ -507,22 +505,22 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- ackEvent(ds, datastore.ReplicationJob{
+ t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
VirtualStorage: "virtual-storage-0",
}, datastore.JobStateCompleted)
- ackEvent(ds, datastore.ReplicationJob{
+ ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "gitaly-1",
VirtualStorage: "virtual-storage-1",
}, datastore.JobStateCompleted)
- vs1Event := ackEvent(ds, datastore.ReplicationJob{
+ vs1Event := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/2",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "gitaly-1",
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 721edab29..0668a59f8 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -412,7 +412,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState := map[datastore.JobState][]uint64{}
for _, event := range events {
- if err := r.processReplJob(ctx, event, shard, target.GetConnection()); err != nil {
+ if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil {
logger.WithFields(logrus.Fields{
logWithReplJobID: event.ID,
logWithReplVirtual: event.Job.VirtualStorage,
@@ -462,7 +462,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
-func (r ReplMgr) processReplJob(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
+func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
source, err := shard.GetNode(event.Job.SourceNodeStorage)
if err != nil {
return fmt.Errorf("get source node: %w", err)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 46053e7d1..fbfcb96ab 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -89,10 +89,7 @@ func TestProcessReplicationJob(t *testing.T) {
},
}
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
// create object pool on the source
objectPoolPath := testhelper.NewTestObjectPoolName(t)
@@ -123,23 +120,28 @@ func TestProcessReplicationJob(t *testing.T) {
})
require.NoError(t, err)
- primary, err := ds.GetPrimary(conf.VirtualStorages[0].Name)
+ entry := testhelper.DiscardTestEntry(t)
+
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
- secondaries, err := ds.GetSecondaries(conf.VirtualStorages[0].Name)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
+
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
require.NoError(t, err)
+ require.Len(t, shard.Secondaries, 1)
var events []datastore.ReplicationEvent
- for _, secondary := range secondaries {
+ for _, secondary := range shard.Secondaries {
events = append(events, datastore.ReplicationEvent{
- State: datastore.JobStateReady,
- Attempt: 3,
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
- TargetNodeStorage: secondary.Storage,
- SourceNodeStorage: primary.Storage,
+ TargetNodeStorage: secondary.GetStorage(),
+ SourceNodeStorage: shard.Primary.GetStorage(),
RelativePath: testRepo.GetRelativePath(),
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"},
+ State: datastore.JobStateReady,
+ Attempt: 3,
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"},
})
}
require.Len(t, events, 1)
@@ -149,13 +151,8 @@ func TestProcessReplicationJob(t *testing.T) {
})
var replicator defaultReplicator
- entry := testhelper.DiscardTestEntry(t)
replicator.log = entry
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
- require.NoError(t, err)
- nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
-
var mockReplicationGauge promtest.MockGauge
var mockReplicationLatencyHistogramVec promtest.MockHistogramVec
var mockReplicationDelayHistogramVec promtest.MockHistogramVec
@@ -163,7 +160,7 @@ func TestProcessReplicationJob(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
- ds,
+ queue,
nodeMgr,
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
WithDelayMetric(&mockReplicationDelayHistogramVec),
@@ -172,11 +169,7 @@ func TestProcessReplicationJob(t *testing.T) {
replMgr.replicator = replicator
- shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
- require.NoError(t, err)
- require.Len(t, shard.Secondaries, 1)
-
- replMgr.processReplJob(ctx, events[0], shard, shard.Secondaries[0].GetConnection())
+ require.NoError(t, replMgr.processReplicationEvent(ctx, events[0], shard, shard.Secondaries[0].GetConnection()))
relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath)
require.NoError(t, err)
@@ -219,21 +212,18 @@ func TestPropagateReplicationJob(t *testing.T) {
},
}
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
- replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
+ replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, nodeMgr)
prf := NewServer(
coordinator.StreamDirector,
@@ -245,7 +235,7 @@ func TestPropagateReplicationJob(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- prf.RegisterServices(nodeMgr, txMgr, conf, ds)
+ prf.RegisterServices(nodeMgr, txMgr, conf, queue)
go prf.Serve(listener, false)
defer prf.Stop()
@@ -496,11 +486,6 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
return ackIDs, err
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
// this job exists to verify that replication works
okJob := datastore.ReplicationJob{
Change: datastore.UpdateRepo,
@@ -509,23 +494,23 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
SourceNodeStorage: primary.Storage,
VirtualStorage: "praefect",
}
- event1, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob})
+ event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob})
require.NoError(t, err)
require.Equal(t, uint64(1), event1.ID)
// this job checks flow for replication event that fails
failJob := okJob
failJob.RelativePath = "invalid path to fail the job"
- event2, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob})
+ event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob})
require.NoError(t, err)
require.Equal(t, uint64(2), event2.ID)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
+ replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -599,11 +584,6 @@ func TestProcessBacklog_Success(t *testing.T) {
return ackIDs, err
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
// Update replication job
eventType1 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
@@ -615,10 +595,10 @@ func TestProcessBacklog_Success(t *testing.T) {
},
}
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1)
+ _, err = queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1)
+ _, err = queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1")
@@ -639,7 +619,7 @@ func TestProcessBacklog_Success(t *testing.T) {
},
}
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType2)
+ _, err = queueInterceptor.Enqueue(ctx, eventType2)
require.NoError(t, err)
// Rename replication job
@@ -655,15 +635,15 @@ func TestProcessBacklog_Success(t *testing.T) {
}
require.NoError(t, err)
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType3)
+ _, err = queueInterceptor.Enqueue(ctx, eventType3)
require.NoError(t, err)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
+ replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -725,7 +705,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
- datastore.Datastore{datastore.NewInMemory(conf), queue},
+ queue,
&mockNodeManager{
GetShardFunc: func(vs string) (nodes.Shard, error) {
require.Equal(t, virtualStorage, vs)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index c56eda44c..229d30200 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -125,10 +125,10 @@ func (srv *Server) Serve(l net.Listener, secure bool) error {
}
// RegisterServices will register any services praefect needs to handle rpcs on its own
-func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, ds datastore.Datastore) {
+func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, ds))
+ gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, queue))
gitalypb.RegisterRefTransactionServer(srv.s, transaction.NewServer(tm))
healthpb.RegisterHealthServer(srv.s, health.NewServer())
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 2ebbb29e2..9feaf3d7a 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -49,7 +49,7 @@ func TestServerRouteServerAccessor(t *testing.T) {
}
)
- cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends)
+ cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends)
defer cleanup()
cli := mock.NewSimpleServiceClient(cc)
@@ -130,7 +130,7 @@ func TestGitalyServerInfo(t *testing.T) {
conf.VirtualStorages[0].Nodes[1].Storage: &mockSvc{},
}
- cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends)
+ cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends)
defer cleanup()
client := gitalypb.NewServerServiceClient(cc)
@@ -443,10 +443,6 @@ func TestRepoRemoval(t *testing.T) {
// TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports
// graceful shutdown, we can remove this code that waits for jobs to be complete
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
jobsDoneCh := make(chan struct{}, 2)
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
@@ -457,7 +453,7 @@ func TestRepoRemoval(t *testing.T) {
return queue.Acknowledge(ctx, state, ids)
})
- cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds)
+ cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, queueInterceptor)
defer cleanup()
ctx, cancel := testhelper.Context()
@@ -575,12 +571,7 @@ func TestRepoRename(t *testing.T) {
return queue.Acknowledge(ctx, state, ids)
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: evq,
- }
-
- cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds)
+ cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, evq)
defer cleanup()
ctx, cancel := testhelper.Context()
diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go
index d336144e9..24510db79 100644
--- a/internal/praefect/service/info/consistencycheck.go
+++ b/internal/praefect/service/info/consistencycheck.go
@@ -169,7 +169,7 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ
return nil
}
-func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp *gitalypb.ConsistencyCheckResponse) error {
+func scheduleReplication(ctx context.Context, csr checksumResult, q datastore.ReplicationEventQueue, resp *gitalypb.ConsistencyCheckResponse) error {
event, err := q.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
@@ -190,7 +190,7 @@ func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp
return nil
}
-func ensureConsistency(ctx context.Context, disableReconcile bool, checksumResultQ <-chan checksumResult, q Queue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
+func ensureConsistency(ctx context.Context, disableReconcile bool, checksumResultQ <-chan checksumResult, q datastore.ReplicationEventQueue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
for {
var csr checksumResult
select {
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index f3920fefa..f46c2fd03 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -3,7 +3,6 @@ package info
import (
"context"
"errors"
- "time"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
@@ -12,26 +11,16 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-// Queue is a subset of the datastore.ReplicationEventQueue functionality needed by this service
-type Queue interface {
- Enqueue(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error)
- CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
-}
-
-// compile time assertion that Queue is satisfied by
-// datastore.ReplicationEventQueue
-var _ Queue = (datastore.ReplicationEventQueue)(nil)
-
// Server is a InfoService server
type Server struct {
gitalypb.UnimplementedPraefectInfoServiceServer
nodeMgr nodes.Manager
conf config.Config
- queue Queue
+ queue datastore.ReplicationEventQueue
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager, conf config.Config, queue Queue) gitalypb.PraefectInfoServiceServer {
+func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue) gitalypb.PraefectInfoServiceServer {
s := &Server{
nodeMgr: nodeMgr,
conf: conf,