Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2020-05-28 17:43:08 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2020-05-28 17:43:08 +0300
commit7542065e6d5673da87a7889bfbf575a4effeae7c (patch)
treeb7a4239f7108fb74ce4e33677b9acd90a5b1dd84
parent97dcd53c020d3d6d123530ac1a41140b950dd607 (diff)
parenta9b2b302e49b351a4f3a552351177d56942c8c25 (diff)
Merge branch 'ps-removal-memory-datastore' into 'master'
Praefect: removal of unnecessary Datastore wrapper See merge request gitlab-org/gitaly!2222
-rw-r--r--cmd/praefect/main.go14
-rw-r--r--internal/praefect/auth_test.go9
-rw-r--r--internal/praefect/coordinator.go38
-rw-r--r--internal/praefect/coordinator_test.go58
-rw-r--r--internal/praefect/dataloss_check_test.go12
-rw-r--r--internal/praefect/datastore/datastore.go163
-rw-r--r--internal/praefect/helper_test.go45
-rw-r--r--internal/praefect/nodes/manager.go8
-rw-r--r--internal/praefect/nodes/manager_test.go66
-rw-r--r--internal/praefect/replicator.go4
-rw-r--r--internal/praefect/replicator_test.go84
-rw-r--r--internal/praefect/server.go4
-rw-r--r--internal/praefect/server_test.go17
-rw-r--r--internal/praefect/service/info/consistencycheck.go4
-rw-r--r--internal/praefect/service/info/server.go15
15 files changed, 148 insertions, 393 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index c9cac9416..c41ed9820 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -231,14 +231,14 @@ func run(cfgs []starter.Config, conf config.Config) error {
db = dbConn
}
- ds := datastore.Datastore{ReplicasDatastore: datastore.NewInMemory(conf)}
+ var queue datastore.ReplicationEventQueue
if conf.MemoryQueueEnabled {
- ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue(conf)
+ queue = datastore.NewMemoryReplicationEventQueue(conf)
} else {
- ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db)
+ queue = datastore.NewPostgresReplicationEventQueue(db)
}
- nodeManager, err := nodes.NewManager(logger, conf, db, ds, nodeLatencyHistogram)
+ nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram)
if err != nil {
return err
}
@@ -261,11 +261,11 @@ func run(cfgs []starter.Config, conf config.Config) error {
var (
// top level server dependencies
- coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator = praefect.NewCoordinator(queue, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered)
repl = praefect.NewReplMgr(
logger,
conf.VirtualStorageNames(),
- ds.ReplicationEventQueue,
+ queue,
nodeManager,
praefect.WithDelayMetric(delayMetric),
praefect.WithLatencyMetric(latencyMetric),
@@ -281,7 +281,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
return fmt.Errorf("unable to create a bootstrap: %v", err)
}
- srv.RegisterServices(nodeManager, transactionManager, conf, ds)
+ srv.RegisterServices(nodeManager, transactionManager, conf, queue)
b.StopAction = srv.GracefulStop
for _, cfg := range cfgs {
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 5146d10b2..cc5f35bbb 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -169,12 +169,9 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
}
logEntry := testhelper.DiscardTestEntry(t)
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
@@ -182,7 +179,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
registry, err := protoregistry.New(fd)
require.NoError(t, err)
- coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry)
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry)
srv := NewServer(coordinator.StreamDirector, logEntry, registry, conf)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b302cc54b..953918c41 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -7,6 +7,7 @@ import (
"sync"
"github.com/golang/protobuf/proto"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag"
@@ -79,23 +80,27 @@ type grpcCall struct {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- nodeMgr nodes.Manager
- txMgr *transactions.Manager
- log logrus.FieldLogger
- datastore datastore.Datastore
- registry *protoregistry.Registry
- conf config.Config
+ nodeMgr nodes.Manager
+ txMgr *transactions.Manager
+ queue datastore.ReplicationEventQueue
+ registry *protoregistry.Registry
+ conf config.Config
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l logrus.FieldLogger, ds datastore.Datastore, nodeMgr nodes.Manager, txMgr *transactions.Manager, conf config.Config, r *protoregistry.Registry) *Coordinator {
+func NewCoordinator(
+ queue datastore.ReplicationEventQueue,
+ nodeMgr nodes.Manager,
+ txMgr *transactions.Manager,
+ conf config.Config,
+ r *protoregistry.Registry,
+) *Coordinator {
return &Coordinator{
- log: l,
- datastore: ds,
- registry: r,
- nodeMgr: nodeMgr,
- txMgr: txMgr,
- conf: conf,
+ queue: queue,
+ registry: r,
+ nodeMgr: nodeMgr,
+ txMgr: txMgr,
+ conf: conf,
}
}
@@ -212,7 +217,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall
func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
- c.log.Debugf("Stream director received method %s", fullMethodName)
+ ctxlogrus.Extract(ctx).Debugf("Stream director received method %s", fullMethodName)
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
@@ -338,9 +343,9 @@ func (c *Coordinator) createReplicaJobs(
go func() {
defer wg.Done()
- _, err := c.datastore.Enqueue(ctx, event)
+ _, err := c.queue.Enqueue(ctx, event)
if err != nil {
- c.log.WithError(err).WithFields(logrus.Fields{
+ ctxlogrus.Extract(ctx).WithError(err).WithFields(logrus.Fields{
logWithReplVirtual: event.Job.VirtualStorage,
logWithReplSource: event.Job.SourceNodeStorage,
logWithReplTarget: event.Job.TargetNodeStorage,
@@ -360,7 +365,6 @@ func (c *Coordinator) ensureCorrelationID(ctx context.Context, targetRepo *gital
var err error
corrID, err = correlation.RandomID()
if err != nil {
- c.log.WithError(err).Error("unable to generate correlation ID")
corrID = generatePseudorandomCorrelationID(targetRepo)
}
}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index d49d29a9a..f104e55b1 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -113,8 +113,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
const storageName = "test-storage"
coordinator := NewCoordinator(
- testhelper.DiscardTestEntry(t),
- datastore.Datastore{datastore.NewInMemory(conf), datastore.NewMemoryReplicationEventQueue(conf)},
+ datastore.NewMemoryReplicationEventQueue(conf),
&mockNodeManager{GetShardFunc: func(storage string) (nodes.Shard, error) {
return nodes.Shard{
IsReadOnly: tc.readOnly,
@@ -154,20 +153,13 @@ func TestStreamDirectorMutator(t *testing.T) {
healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
+ primaryNode := &models.Node{Address: primaryAddress, Storage: "praefect-internal-1", DefaultPrimary: true}
+ secondaryNode := &models.Node{Address: secondaryAddress, Storage: "praefect-internal-2"}
conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
- Name: "praefect",
- Nodes: []*models.Node{
- &models.Node{
- Address: primaryAddress,
- Storage: "praefect-internal-1",
- DefaultPrimary: true,
- },
- &models.Node{
- Address: secondaryAddress,
- Storage: "praefect-internal-2",
- }},
+ Name: "praefect",
+ Nodes: []*models.Node{primaryNode, secondaryNode},
},
},
}
@@ -180,11 +172,6 @@ func TestStreamDirectorMutator(t *testing.T) {
return queue.Enqueue(ctx, event)
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
targetRepo := gitalypb.Repository{
StorageName: "praefect",
RelativePath: "/path/to/hashed/storage",
@@ -195,12 +182,12 @@ func TestStreamDirectorMutator(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
@@ -234,13 +221,8 @@ func TestStreamDirectorMutator(t *testing.T) {
// this call creates new events in the queue and simulates usual flow of the update operation
streamParams.RequestFinalizer()
- targetNode, err := ds.GetStorageNode("praefect-internal-2")
- require.NoError(t, err)
- sourceNode, err := ds.GetStorageNode("praefect-internal-1")
- require.NoError(t, err)
-
replEventWait.Wait() // wait until event persisted (async operation)
- events, err := ds.ReplicationEventQueue.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
+ events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
require.NoError(t, err)
require.Len(t, events, 1)
@@ -255,8 +237,8 @@ func TestStreamDirectorMutator(t *testing.T) {
Change: datastore.UpdateRepo,
VirtualStorage: conf.VirtualStorages[0].Name,
RelativePath: targetRepo.RelativePath,
- TargetNodeStorage: targetNode.Storage,
- SourceNodeStorage: sourceNode.Storage,
+ TargetNodeStorage: secondaryNode.Storage,
+ SourceNodeStorage: primaryNode.Storage,
},
Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
}
@@ -289,10 +271,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
},
}
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -305,12 +284,12 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
require.NoError(t, err)
@@ -388,11 +367,6 @@ func TestAbsentCorrelationID(t *testing.T) {
return queue.Enqueue(ctx, event)
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
targetRepo := gitalypb.Repository{
StorageName: "praefect",
RelativePath: "/path/to/hashed/storage",
@@ -403,11 +377,11 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
Origin: &targetRepo,
@@ -427,7 +401,7 @@ func TestAbsentCorrelationID(t *testing.T) {
streamParams.RequestFinalizer()
replEventWait.Wait() // wait until event persisted (async operation)
- jobs, err := coordinator.datastore.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1)
+ jobs, err := queueInterceptor.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 1)
diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go
index bc705b632..26daba4b6 100644
--- a/internal/praefect/dataloss_check_test.go
+++ b/internal/praefect/dataloss_check_test.go
@@ -106,15 +106,9 @@ func TestDatalossCheck(t *testing.T) {
require.NoError(t, err)
killJobs(t)
- cc, _, clean := runPraefectServerWithMock(t, cfg,
- datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(cfg),
- ReplicationEventQueue: rq,
- },
- map[string]mock.SimpleServiceServer{
- "not-needed": &mock.UnimplementedSimpleServiceServer{},
- },
- )
+ cc, _, clean := runPraefectServerWithMock(t, cfg, rq, map[string]mock.SimpleServiceServer{
+ "not-needed": &mock.UnimplementedSimpleServiceServer{},
+ })
defer clean()
pbFrom, err := ptypes.TimestampProto(beforeTimerange.CreatedAt)
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 9a4513f0b..c89d974d6 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -8,13 +8,7 @@ package datastore
import (
"database/sql/driver"
"encoding/json"
- "errors"
"fmt"
- "sync"
- "time"
-
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
// JobState is an enum that indicates the state of a job
@@ -89,160 +83,3 @@ func (p Params) Value() (driver.Value, error) {
}
return string(data), nil
}
-
-// ReplJob is an instance of a queued replication job. A replication job is
-// meant for updating the repository so that it is synced with the primary
-// copy. Scheduled indicates when a replication job should be performed.
-type ReplJob struct {
- Change ChangeType
- ID uint64 // autoincrement ID
- VirtualStorage string // virtual storage
- TargetNode, SourceNode models.Node // which node to replicate to?
- RelativePath string // source for replication
- State JobState
- Attempts int
- Params Params // additional information required to run the job
- CorrelationID string // from original request
- CreatedAt time.Time // when has the job been created?
-}
-
-// Datastore is a data persistence abstraction for all of Praefect's
-// persistence needs
-type Datastore struct {
- ReplicasDatastore
- ReplicationEventQueue
-}
-
-// ReplicasDatastore manages accessing and setting which secondary replicas
-// backup a repository
-type ReplicasDatastore interface {
- GetPrimary(virtualStorage string) (models.Node, error)
-
- GetSecondaries(virtualStorage string) ([]models.Node, error)
-
- GetReplicas(relativePath string) ([]models.Node, error)
-
- GetStorageNode(nodeStorage string) (models.Node, error)
-
- GetStorageNodes() ([]models.Node, error)
- // VirtualStorages returns a list of virtual storages that are configured for this instance.
- VirtualStorages() []string
-}
-
-// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
-// only intended for early beta requirements and as a reference implementation
-// for the eventual SQL implementation
-type MemoryDatastore struct {
- // storageNodes is read-only after initialization
- // if modification needed there must be synchronization for concurrent access to it
- storageNodes map[string]models.Node
-
- repositories *struct {
- sync.RWMutex
- m map[string]models.Repository
- }
-
- // virtualStorages is read-only after initialization
- // if modification needed there must be synchronization for concurrent access to it
- virtualStorages map[string][]*models.Node
-}
-
-// NewInMemory returns an initialized in-memory datastore
-func NewInMemory(cfg config.Config) *MemoryDatastore {
- m := &MemoryDatastore{
- storageNodes: map[string]models.Node{},
- repositories: &struct {
- sync.RWMutex
- m map[string]models.Repository
- }{
- m: map[string]models.Repository{},
- },
- virtualStorages: map[string][]*models.Node{},
- }
-
- for _, virtualStorage := range cfg.VirtualStorages {
- m.virtualStorages[virtualStorage.Name] = virtualStorage.Nodes
-
- for _, node := range virtualStorage.Nodes {
- // TODO: if there is two nodes with same storage name defined for different virtual storages
- // only one definition will be used: https://gitlab.com/gitlab-org/gitaly/-/issues/2613
- if _, ok := m.storageNodes[node.Storage]; ok {
- continue
- }
- m.storageNodes[node.Storage] = *node
- }
- }
-
- return m
-}
-
-// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it
-var ErrNoPrimaryForStorage = errors.New("no primary for storage")
-
-// GetPrimary returns the primary configured in the config file
-func (md *MemoryDatastore) GetPrimary(virtualStorage string) (models.Node, error) {
- for _, node := range md.virtualStorages[virtualStorage] {
- if node.DefaultPrimary {
- return *node, nil
- }
- }
-
- return models.Node{}, ErrNoPrimaryForStorage
-}
-
-// GetSecondaries gets the secondary nodes associated with a virtual storage
-func (md *MemoryDatastore) GetSecondaries(virtualStorage string) ([]models.Node, error) {
- var secondaries []models.Node
-
- for _, node := range md.virtualStorages[virtualStorage] {
- if !node.DefaultPrimary {
- secondaries = append(secondaries, *node)
- }
- }
-
- return secondaries, nil
-}
-
-// GetReplicas gets the secondaries for a repository based on the relative path
-func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, error) {
- md.repositories.RLock()
- defer md.repositories.RUnlock()
-
- repository, ok := md.repositories.m[relativePath]
- if !ok {
- return nil, errors.New("repository not found")
- }
-
- // to prevent possible modification of element of the slice
- copied := repository.Clone()
- return copied.Replicas, nil
-}
-
-// GetStorageNode gets all storage nodes
-func (md *MemoryDatastore) GetStorageNode(nodeStorage string) (models.Node, error) {
- node, ok := md.storageNodes[nodeStorage]
- if !ok {
- return models.Node{}, errors.New("node not found")
- }
-
- return node, nil
-}
-
-// GetStorageNodes gets all storage nodes
-func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
- var storageNodes []models.Node
- for _, storageNode := range md.storageNodes {
- storageNodes = append(storageNodes, storageNode)
- }
-
- return storageNodes, nil
-}
-
-// VirtualStorages returns list of virtual storages configured to be supported.
-func (md *MemoryDatastore) VirtualStorages() []string {
- vs := make([]string, 0, len(md.virtualStorages))
- for name := range md.virtualStorages {
- vs = append(vs, name)
- }
- return vs
-}
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 54d195483..a8db569ce 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -90,12 +90,12 @@ func assertPrimariesExist(t testing.TB, conf config.Config) {
// config.Nodes. There must be a 1-to-1 mapping between backend server and
// configured storage node.
// requires there to be only 1 virtual storage
-func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithMock(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
r, err := protoregistry.New(mustLoadProtoReg(t))
require.NoError(t, err)
return runPraefectServer(t, conf, buildOptions{
- withDatastore: ds,
+ withQueue: queue,
withBackends: withMockBackends(t, backends),
withAnnotations: r,
})
@@ -119,7 +119,7 @@ func (nullNodeMgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPa
}
type buildOptions struct {
- withDatastore datastore.Datastore
+ withQueue datastore.ReplicationEventQueue
withTxMgr *transactions.Manager
withBackends func([]*config.VirtualStorage) []testhelper.Cleanup
withAnnotations *protoregistry.Registry
@@ -184,37 +184,29 @@ func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhel
}
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
-
- return runPraefectServerWithGitalyWithDatastore(t, conf, ds)
+ return runPraefectServerWithGitalyWithDatastore(t, conf, defaultQueue(conf))
}
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
// requires exactly 1 virtual storage
-func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, ds datastore.Datastore) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
+func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
return runPraefectServer(t, conf, buildOptions{
- withDatastore: ds,
- withTxMgr: transactions.NewManager(),
- withBackends: withRealGitalyShared(t),
+ withQueue: queue,
+ withTxMgr: transactions.NewManager(),
+ withBackends: withRealGitalyShared(t),
})
}
-func defaultDatastore(conf config.Config) datastore.Datastore {
- return datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+func defaultQueue(conf config.Config) datastore.ReplicationEventQueue {
+ return datastore.NewMemoryReplicationEventQueue(conf)
}
func defaultTxMgr() *transactions.Manager {
return transactions.NewManager()
}
-func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, promtest.NewMockHistogramVec())
+func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager {
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
return nodeMgr
@@ -225,8 +217,8 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
var cleanups []testhelper.Cleanup
- if opt.withDatastore == (datastore.Datastore{}) {
- opt.withDatastore = defaultDatastore(conf)
+ if opt.withQueue == nil {
+ opt.withQueue = defaultQueue(conf)
}
if opt.withTxMgr == nil {
opt.withTxMgr = defaultTxMgr()
@@ -241,12 +233,11 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withLogger = log.Default()
}
if opt.withNodeMgr == nil {
- opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withDatastore)
+ opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withQueue)
}
coordinator := NewCoordinator(
- opt.withLogger,
- opt.withDatastore,
+ opt.withQueue,
opt.withNodeMgr,
opt.withTxMgr,
conf,
@@ -257,7 +248,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
replmgr := NewReplMgr(
opt.withLogger,
conf.VirtualStorageNames(),
- opt.withDatastore,
+ opt.withQueue,
opt.withNodeMgr,
WithQueueMetric(&promtest.MockGauge{}),
)
@@ -269,7 +260,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
errQ := make(chan error)
ctx, cancel := testhelper.Context()
- prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withDatastore)
+ prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withQueue)
go func() { errQ <- prf.Serve(listener, false) }()
replmgr.ProcessBacklog(ctx, noopBackoffFunc)
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index d9d79ec2f..91f2c1d97 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -79,7 +79,7 @@ type Mgr struct {
// strategies is a map of strategies keyed on virtual storage name
strategies map[string]leaderElectionStrategy
db *sql.DB
- ds datastore.Datastore
+ queue datastore.ReplicationEventQueue
}
// leaderElectionStrategy defines the interface by which primary and
@@ -98,7 +98,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
@@ -148,7 +148,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Dat
db: db,
failoverEnabled: c.Failover.Enabled,
strategies: strategies,
- ds: ds,
+ queue: queue,
}, nil
}
@@ -206,7 +206,7 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
var storages []string
if featureflag.IsEnabled(ctx, featureflag.DistributedReads) {
- if storages, err = n.ds.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil {
+ if storages, err = n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil {
// this is recoverable error - proceed with primary node
ctxlogrus.Extract(ctx).
WithError(err).
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 047d1c4b6..e2b318a59 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -86,7 +86,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"},
VirtualStorages: []*config.VirtualStorage{virtualStorage},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec())
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
nm.Start(time.Millisecond, time.Millisecond)
@@ -134,7 +134,7 @@ func TestPrimaryIsSecond(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, mockHistogram)
require.NoError(t, err)
shard, err := nm.GetShard("virtual-storage-0")
@@ -184,7 +184,7 @@ func TestBlockingDial(t *testing.T) {
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
}()
- mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec())
+ mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec())
require.NoError(t, err)
mgr.Start(1*time.Millisecond, 1*time.Millisecond)
@@ -229,13 +229,11 @@ func TestNodeManager(t *testing.T) {
Failover: config.Failover{Enabled: false},
}
- ds := datastore.Datastore{}
-
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, ds, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, ds, mockHistogram)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram)
require.NoError(t, err)
nm.Start(1*time.Millisecond, 5*time.Second)
@@ -377,46 +375,46 @@ func TestMgr_GetSyncedNode(t *testing.T) {
ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.DistributedReads)
- ackEvent := func(ds datastore.Datastore, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent {
+ ackEvent := func(queue datastore.ReplicationEventQueue, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent {
event := datastore.ReplicationEvent{Job: job}
- eevent, err := ds.Enqueue(ctx, event)
+ eevent, err := queue.Enqueue(ctx, event)
require.NoError(t, err)
- devents, err := ds.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2)
+ devents, err := queue.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2)
require.NoError(t, err)
require.Len(t, devents, 1)
- acks, err := ds.Acknowledge(ctx, state, []uint64{devents[0].ID})
+ acks, err := queue.Acknowledge(ctx, state, []uint64{devents[0].ID})
require.NoError(t, err)
require.Equal(t, []uint64{devents[0].ID}, acks)
return devents[0]
}
- verify := func(scenario func(t *testing.T, nm Manager, ds datastore.Datastore)) func(*testing.T) {
- ds := datastore.Datastore{ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf)}
+ verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) {
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram)
require.NoError(t, err)
nm.Start(time.Duration(0), time.Hour)
- return func(t *testing.T) { scenario(t, nm, ds) }
+ return func(t *testing.T) { scenario(t, nm, queue) }
}
- t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
_, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "")
require.True(t, errors.Is(err, ErrVirtualStorageNotExist))
}))
- t.Run("no replication events", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ t.Run("no replication events", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "no/matter")
require.NoError(t, err)
require.Contains(t, []string{vs0Primary, "unix://" + sockets[1]}, node.GetAddress())
}))
- t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- _, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
@@ -431,8 +429,8 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vs0Event, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vs0Event, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
@@ -442,7 +440,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
})
require.NoError(t, err)
- vs0Events, err := ds.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500)
+ vs0Events, err := queue.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500)
require.NoError(t, err)
require.Len(t, vs0Events, 1)
@@ -451,8 +449,8 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vs0Event := ackEvent(ds, datastore.ReplicationJob{
+ t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vs0Event := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
@@ -464,15 +462,15 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vsEvent0 := ackEvent(ds, datastore.ReplicationJob{
+ t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vsEvent0 := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
VirtualStorage: "virtual-storage-0",
}, datastore.JobStateCompleted)
- vsEvent1, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ vsEvent1, err := queue.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
RelativePath: vsEvent0.Job.RelativePath,
TargetNodeStorage: vsEvent0.Job.TargetNodeStorage,
@@ -487,15 +485,15 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- vs0Event := ackEvent(ds, datastore.ReplicationJob{
+ t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ vs0Event := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
VirtualStorage: "virtual-storage-0",
}, datastore.JobStateDead)
- ackEvent(ds, datastore.ReplicationJob{
+ ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "gitaly-1",
@@ -507,22 +505,22 @@ func TestMgr_GetSyncedNode(t *testing.T) {
require.Equal(t, vs0Primary, node.GetAddress())
}))
- t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
- ackEvent(ds, datastore.ReplicationJob{
+ t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) {
+ ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-1",
SourceNodeStorage: "gitaly-0",
VirtualStorage: "virtual-storage-0",
}, datastore.JobStateCompleted)
- ackEvent(ds, datastore.ReplicationJob{
+ ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/1",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "gitaly-1",
VirtualStorage: "virtual-storage-1",
}, datastore.JobStateCompleted)
- vs1Event := ackEvent(ds, datastore.ReplicationJob{
+ vs1Event := ackEvent(queue, datastore.ReplicationJob{
RelativePath: "path/2",
TargetNodeStorage: "gitaly-2",
SourceNodeStorage: "gitaly-1",
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 721edab29..0668a59f8 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -412,7 +412,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
eventIDsByState := map[datastore.JobState][]uint64{}
for _, event := range events {
- if err := r.processReplJob(ctx, event, shard, target.GetConnection()); err != nil {
+ if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil {
logger.WithFields(logrus.Fields{
logWithReplJobID: event.ID,
logWithReplVirtual: event.Job.VirtualStorage,
@@ -462,7 +462,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
-func (r ReplMgr) processReplJob(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
+func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
source, err := shard.GetNode(event.Job.SourceNodeStorage)
if err != nil {
return fmt.Errorf("get source node: %w", err)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 46053e7d1..fbfcb96ab 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -89,10 +89,7 @@ func TestProcessReplicationJob(t *testing.T) {
},
}
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
// create object pool on the source
objectPoolPath := testhelper.NewTestObjectPoolName(t)
@@ -123,23 +120,28 @@ func TestProcessReplicationJob(t *testing.T) {
})
require.NoError(t, err)
- primary, err := ds.GetPrimary(conf.VirtualStorages[0].Name)
+ entry := testhelper.DiscardTestEntry(t)
+
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
- secondaries, err := ds.GetSecondaries(conf.VirtualStorages[0].Name)
+ nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
+
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
require.NoError(t, err)
+ require.Len(t, shard.Secondaries, 1)
var events []datastore.ReplicationEvent
- for _, secondary := range secondaries {
+ for _, secondary := range shard.Secondaries {
events = append(events, datastore.ReplicationEvent{
- State: datastore.JobStateReady,
- Attempt: 3,
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
- TargetNodeStorage: secondary.Storage,
- SourceNodeStorage: primary.Storage,
+ TargetNodeStorage: secondary.GetStorage(),
+ SourceNodeStorage: shard.Primary.GetStorage(),
RelativePath: testRepo.GetRelativePath(),
},
- Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"},
+ State: datastore.JobStateReady,
+ Attempt: 3,
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"},
})
}
require.Len(t, events, 1)
@@ -149,13 +151,8 @@ func TestProcessReplicationJob(t *testing.T) {
})
var replicator defaultReplicator
- entry := testhelper.DiscardTestEntry(t)
replicator.log = entry
- nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
- require.NoError(t, err)
- nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
-
var mockReplicationGauge promtest.MockGauge
var mockReplicationLatencyHistogramVec promtest.MockHistogramVec
var mockReplicationDelayHistogramVec promtest.MockHistogramVec
@@ -163,7 +160,7 @@ func TestProcessReplicationJob(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
- ds,
+ queue,
nodeMgr,
WithLatencyMetric(&mockReplicationLatencyHistogramVec),
WithDelayMetric(&mockReplicationDelayHistogramVec),
@@ -172,11 +169,7 @@ func TestProcessReplicationJob(t *testing.T) {
replMgr.replicator = replicator
- shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
- require.NoError(t, err)
- require.Len(t, shard.Secondaries, 1)
-
- replMgr.processReplJob(ctx, events[0], shard, shard.Secondaries[0].GetConnection())
+ require.NoError(t, replMgr.processReplicationEvent(ctx, events[0], shard, shard.Secondaries[0].GetConnection()))
relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath)
require.NoError(t, err)
@@ -219,21 +212,18 @@ func TestPropagateReplicationJob(t *testing.T) {
},
}
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
- }
+ queue := datastore.NewMemoryReplicationEventQueue(conf)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
txMgr := transactions.NewManager()
- coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
+ coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered)
- replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
+ replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, nodeMgr)
prf := NewServer(
coordinator.StreamDirector,
@@ -245,7 +235,7 @@ func TestPropagateReplicationJob(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- prf.RegisterServices(nodeMgr, txMgr, conf, ds)
+ prf.RegisterServices(nodeMgr, txMgr, conf, queue)
go prf.Serve(listener, false)
defer prf.Stop()
@@ -496,11 +486,6 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
return ackIDs, err
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
// this job exists to verify that replication works
okJob := datastore.ReplicationJob{
Change: datastore.UpdateRepo,
@@ -509,23 +494,23 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
SourceNodeStorage: primary.Storage,
VirtualStorage: "praefect",
}
- event1, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob})
+ event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob})
require.NoError(t, err)
require.Equal(t, uint64(1), event1.ID)
// this job checks flow for replication event that fails
failJob := okJob
failJob.RelativePath = "invalid path to fail the job"
- event2, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob})
+ event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob})
require.NoError(t, err)
require.Equal(t, uint64(2), event2.ID)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
+ replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -599,11 +584,6 @@ func TestProcessBacklog_Success(t *testing.T) {
return ackIDs, err
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
-
// Update replication job
eventType1 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
@@ -615,10 +595,10 @@ func TestProcessBacklog_Success(t *testing.T) {
},
}
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1)
+ _, err = queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1)
+ _, err = queueInterceptor.Enqueue(ctx, eventType1)
require.NoError(t, err)
renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1")
@@ -639,7 +619,7 @@ func TestProcessBacklog_Success(t *testing.T) {
},
}
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType2)
+ _, err = queueInterceptor.Enqueue(ctx, eventType2)
require.NoError(t, err)
// Rename replication job
@@ -655,15 +635,15 @@ func TestProcessBacklog_Success(t *testing.T) {
}
require.NoError(t, err)
- _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType3)
+ _, err = queueInterceptor.Enqueue(ctx, eventType3)
require.NoError(t, err)
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr)
+ replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr)
replMgr.ProcessBacklog(ctx, noopBackoffFunc)
select {
@@ -725,7 +705,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) {
replMgr := NewReplMgr(
testhelper.DiscardTestEntry(t),
conf.VirtualStorageNames(),
- datastore.Datastore{datastore.NewInMemory(conf), queue},
+ queue,
&mockNodeManager{
GetShardFunc: func(vs string) (nodes.Shard, error) {
require.Equal(t, virtualStorage, vs)
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index c56eda44c..229d30200 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -125,10 +125,10 @@ func (srv *Server) Serve(l net.Listener, secure bool) error {
}
// RegisterServices will register any services praefect needs to handle rpcs on its own
-func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, ds datastore.Datastore) {
+func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) {
// ServerServiceServer is necessary for the ServerInfo RPC
gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(conf, nm))
- gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, ds))
+ gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, queue))
gitalypb.RegisterRefTransactionServer(srv.s, transaction.NewServer(tm))
healthpb.RegisterHealthServer(srv.s, health.NewServer())
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 2ebbb29e2..9feaf3d7a 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -49,7 +49,7 @@ func TestServerRouteServerAccessor(t *testing.T) {
}
)
- cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends)
+ cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends)
defer cleanup()
cli := mock.NewSimpleServiceClient(cc)
@@ -130,7 +130,7 @@ func TestGitalyServerInfo(t *testing.T) {
conf.VirtualStorages[0].Nodes[1].Storage: &mockSvc{},
}
- cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends)
+ cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends)
defer cleanup()
client := gitalypb.NewServerServiceClient(cc)
@@ -443,10 +443,6 @@ func TestRepoRemoval(t *testing.T) {
// TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports
// graceful shutdown, we can remove this code that waits for jobs to be complete
queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: queueInterceptor,
- }
jobsDoneCh := make(chan struct{}, 2)
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
@@ -457,7 +453,7 @@ func TestRepoRemoval(t *testing.T) {
return queue.Acknowledge(ctx, state, ids)
})
- cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds)
+ cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, queueInterceptor)
defer cleanup()
ctx, cancel := testhelper.Context()
@@ -575,12 +571,7 @@ func TestRepoRename(t *testing.T) {
return queue.Acknowledge(ctx, state, ids)
})
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: evq,
- }
-
- cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds)
+ cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, evq)
defer cleanup()
ctx, cancel := testhelper.Context()
diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go
index d336144e9..24510db79 100644
--- a/internal/praefect/service/info/consistencycheck.go
+++ b/internal/praefect/service/info/consistencycheck.go
@@ -169,7 +169,7 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ
return nil
}
-func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp *gitalypb.ConsistencyCheckResponse) error {
+func scheduleReplication(ctx context.Context, csr checksumResult, q datastore.ReplicationEventQueue, resp *gitalypb.ConsistencyCheckResponse) error {
event, err := q.Enqueue(ctx, datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
Change: datastore.UpdateRepo,
@@ -190,7 +190,7 @@ func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp
return nil
}
-func ensureConsistency(ctx context.Context, disableReconcile bool, checksumResultQ <-chan checksumResult, q Queue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
+func ensureConsistency(ctx context.Context, disableReconcile bool, checksumResultQ <-chan checksumResult, q datastore.ReplicationEventQueue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
for {
var csr checksumResult
select {
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index f3920fefa..f46c2fd03 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -3,7 +3,6 @@ package info
import (
"context"
"errors"
- "time"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
@@ -12,26 +11,16 @@ import (
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-// Queue is a subset of the datastore.ReplicationEventQueue functionality needed by this service
-type Queue interface {
- Enqueue(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error)
- CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
-}
-
-// compile time assertion that Queue is satisfied by
-// datastore.ReplicationEventQueue
-var _ Queue = (datastore.ReplicationEventQueue)(nil)
-
// Server is a InfoService server
type Server struct {
gitalypb.UnimplementedPraefectInfoServiceServer
nodeMgr nodes.Manager
conf config.Config
- queue Queue
+ queue datastore.ReplicationEventQueue
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager, conf config.Config, queue Queue) gitalypb.PraefectInfoServiceServer {
+func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue) gitalypb.PraefectInfoServiceServer {
s := &Server{
nodeMgr: nodeMgr,
conf: conf,