diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-05-28 17:43:08 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-05-28 17:43:08 +0300 |
commit | 7542065e6d5673da87a7889bfbf575a4effeae7c (patch) | |
tree | b7a4239f7108fb74ce4e33677b9acd90a5b1dd84 | |
parent | 97dcd53c020d3d6d123530ac1a41140b950dd607 (diff) | |
parent | a9b2b302e49b351a4f3a552351177d56942c8c25 (diff) |
Merge branch 'ps-removal-memory-datastore' into 'master'
Praefect: removal of unnecessary Datastore wrapper
See merge request gitlab-org/gitaly!2222
-rw-r--r-- | cmd/praefect/main.go | 14 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 38 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 58 | ||||
-rw-r--r-- | internal/praefect/dataloss_check_test.go | 12 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 163 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 45 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 8 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 66 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 4 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 84 | ||||
-rw-r--r-- | internal/praefect/server.go | 4 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 17 | ||||
-rw-r--r-- | internal/praefect/service/info/consistencycheck.go | 4 | ||||
-rw-r--r-- | internal/praefect/service/info/server.go | 15 |
15 files changed, 148 insertions, 393 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index c9cac9416..c41ed9820 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -231,14 +231,14 @@ func run(cfgs []starter.Config, conf config.Config) error { db = dbConn } - ds := datastore.Datastore{ReplicasDatastore: datastore.NewInMemory(conf)} + var queue datastore.ReplicationEventQueue if conf.MemoryQueueEnabled { - ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue(conf) + queue = datastore.NewMemoryReplicationEventQueue(conf) } else { - ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db) + queue = datastore.NewPostgresReplicationEventQueue(db) } - nodeManager, err := nodes.NewManager(logger, conf, db, ds, nodeLatencyHistogram) + nodeManager, err := nodes.NewManager(logger, conf, db, queue, nodeLatencyHistogram) if err != nil { return err } @@ -261,11 +261,11 @@ func run(cfgs []starter.Config, conf config.Config) error { var ( // top level server dependencies - coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered) + coordinator = praefect.NewCoordinator(queue, nodeManager, transactionManager, conf, protoregistry.GitalyProtoPreregistered) repl = praefect.NewReplMgr( logger, conf.VirtualStorageNames(), - ds.ReplicationEventQueue, + queue, nodeManager, praefect.WithDelayMetric(delayMetric), praefect.WithLatencyMetric(latencyMetric), @@ -281,7 +281,7 @@ func run(cfgs []starter.Config, conf config.Config) error { return fmt.Errorf("unable to create a bootstrap: %v", err) } - srv.RegisterServices(nodeManager, transactionManager, conf, ds) + srv.RegisterServices(nodeManager, transactionManager, conf, queue) b.StopAction = srv.GracefulStop for _, cfg := range cfgs { diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 5146d10b2..cc5f35bbb 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -169,12 +169,9 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func } logEntry := testhelper.DiscardTestEntry(t) - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), - } + queue := datastore.NewMemoryReplicationEventQueue(conf) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() @@ -182,7 +179,7 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func registry, err := protoregistry.New(fd) require.NoError(t, err) - coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry) + coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry) srv := NewServer(coordinator.StreamDirector, logEntry, registry, conf) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b302cc54b..953918c41 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/golang/protobuf/proto" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" @@ -79,23 +80,27 @@ type grpcCall struct { // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - nodeMgr nodes.Manager - txMgr *transactions.Manager - log logrus.FieldLogger - datastore datastore.Datastore - registry *protoregistry.Registry - conf config.Config + nodeMgr nodes.Manager + txMgr *transactions.Manager + queue datastore.ReplicationEventQueue + registry *protoregistry.Registry + conf config.Config } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l logrus.FieldLogger, ds datastore.Datastore, nodeMgr nodes.Manager, txMgr *transactions.Manager, conf config.Config, r *protoregistry.Registry) *Coordinator { +func NewCoordinator( + queue datastore.ReplicationEventQueue, + nodeMgr nodes.Manager, + txMgr *transactions.Manager, + conf config.Config, + r *protoregistry.Registry, +) *Coordinator { return &Coordinator{ - log: l, - datastore: ds, - registry: r, - nodeMgr: nodeMgr, - txMgr: txMgr, - conf: conf, + queue: queue, + registry: r, + nodeMgr: nodeMgr, + txMgr: txMgr, + conf: conf, } } @@ -212,7 +217,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (*proxy.StreamParameters, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. - c.log.Debugf("Stream director received method %s", fullMethodName) + ctxlogrus.Extract(ctx).Debugf("Stream director received method %s", fullMethodName) mi, err := c.registry.LookupMethod(fullMethodName) if err != nil { @@ -338,9 +343,9 @@ func (c *Coordinator) createReplicaJobs( go func() { defer wg.Done() - _, err := c.datastore.Enqueue(ctx, event) + _, err := c.queue.Enqueue(ctx, event) if err != nil { - c.log.WithError(err).WithFields(logrus.Fields{ + ctxlogrus.Extract(ctx).WithError(err).WithFields(logrus.Fields{ logWithReplVirtual: event.Job.VirtualStorage, logWithReplSource: event.Job.SourceNodeStorage, logWithReplTarget: event.Job.TargetNodeStorage, @@ -360,7 +365,6 @@ func (c *Coordinator) ensureCorrelationID(ctx context.Context, targetRepo *gital var err error corrID, err = correlation.RandomID() if err != nil { - c.log.WithError(err).Error("unable to generate correlation ID") corrID = generatePseudorandomCorrelationID(targetRepo) } } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index d49d29a9a..f104e55b1 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -113,8 +113,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { const storageName = "test-storage" coordinator := NewCoordinator( - testhelper.DiscardTestEntry(t), - datastore.Datastore{datastore.NewInMemory(conf), datastore.NewMemoryReplicationEventQueue(conf)}, + datastore.NewMemoryReplicationEventQueue(conf), &mockNodeManager{GetShardFunc: func(storage string) (nodes.Shard, error) { return nodes.Shard{ IsReadOnly: tc.readOnly, @@ -154,20 +153,13 @@ func TestStreamDirectorMutator(t *testing.T) { healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1 + primaryNode := &models.Node{Address: primaryAddress, Storage: "praefect-internal-1", DefaultPrimary: true} + secondaryNode := &models.Node{Address: secondaryAddress, Storage: "praefect-internal-2"} conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ &config.VirtualStorage{ - Name: "praefect", - Nodes: []*models.Node{ - &models.Node{ - Address: primaryAddress, - Storage: "praefect-internal-1", - DefaultPrimary: true, - }, - &models.Node{ - Address: secondaryAddress, - Storage: "praefect-internal-2", - }}, + Name: "praefect", + Nodes: []*models.Node{primaryNode, secondaryNode}, }, }, } @@ -180,11 +172,6 @@ func TestStreamDirectorMutator(t *testing.T) { return queue.Enqueue(ctx, event) }) - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: queueInterceptor, - } - targetRepo := gitalypb.Repository{ StorageName: "praefect", RelativePath: "/path/to/hashed/storage", @@ -195,12 +182,12 @@ func TestStreamDirectorMutator(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() - coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ Origin: &targetRepo, @@ -234,13 +221,8 @@ func TestStreamDirectorMutator(t *testing.T) { // this call creates new events in the queue and simulates usual flow of the update operation streamParams.RequestFinalizer() - targetNode, err := ds.GetStorageNode("praefect-internal-2") - require.NoError(t, err) - sourceNode, err := ds.GetStorageNode("praefect-internal-1") - require.NoError(t, err) - replEventWait.Wait() // wait until event persisted (async operation) - events, err := ds.ReplicationEventQueue.Dequeue(ctx, "praefect", "praefect-internal-2", 10) + events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10) require.NoError(t, err) require.Len(t, events, 1) @@ -255,8 +237,8 @@ func TestStreamDirectorMutator(t *testing.T) { Change: datastore.UpdateRepo, VirtualStorage: conf.VirtualStorages[0].Name, RelativePath: targetRepo.RelativePath, - TargetNodeStorage: targetNode.Storage, - SourceNodeStorage: sourceNode.Storage, + TargetNodeStorage: secondaryNode.Storage, + SourceNodeStorage: primaryNode.Storage, }, Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, } @@ -289,10 +271,7 @@ func TestStreamDirectorAccessor(t *testing.T) { }, } - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), - } + queue := datastore.NewMemoryReplicationEventQueue(conf) targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -305,12 +284,12 @@ func TestStreamDirectorAccessor(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() - coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) require.NoError(t, err) @@ -388,11 +367,6 @@ func TestAbsentCorrelationID(t *testing.T) { return queue.Enqueue(ctx, event) }) - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: queueInterceptor, - } - targetRepo := gitalypb.Repository{ StorageName: "praefect", RelativePath: "/path/to/hashed/storage", @@ -403,11 +377,11 @@ func TestAbsentCorrelationID(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() - coordinator := NewCoordinator(entry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator(queueInterceptor, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ Origin: &targetRepo, @@ -427,7 +401,7 @@ func TestAbsentCorrelationID(t *testing.T) { streamParams.RequestFinalizer() replEventWait.Wait() // wait until event persisted (async operation) - jobs, err := coordinator.datastore.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1) + jobs, err := queueInterceptor.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1) require.NoError(t, err) require.Len(t, jobs, 1) diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go index bc705b632..26daba4b6 100644 --- a/internal/praefect/dataloss_check_test.go +++ b/internal/praefect/dataloss_check_test.go @@ -106,15 +106,9 @@ func TestDatalossCheck(t *testing.T) { require.NoError(t, err) killJobs(t) - cc, _, clean := runPraefectServerWithMock(t, cfg, - datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(cfg), - ReplicationEventQueue: rq, - }, - map[string]mock.SimpleServiceServer{ - "not-needed": &mock.UnimplementedSimpleServiceServer{}, - }, - ) + cc, _, clean := runPraefectServerWithMock(t, cfg, rq, map[string]mock.SimpleServiceServer{ + "not-needed": &mock.UnimplementedSimpleServiceServer{}, + }) defer clean() pbFrom, err := ptypes.TimestampProto(beforeTimerange.CreatedAt) diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 9a4513f0b..c89d974d6 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -8,13 +8,7 @@ package datastore import ( "database/sql/driver" "encoding/json" - "errors" "fmt" - "sync" - "time" - - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) // JobState is an enum that indicates the state of a job @@ -89,160 +83,3 @@ func (p Params) Value() (driver.Value, error) { } return string(data), nil } - -// ReplJob is an instance of a queued replication job. A replication job is -// meant for updating the repository so that it is synced with the primary -// copy. Scheduled indicates when a replication job should be performed. -type ReplJob struct { - Change ChangeType - ID uint64 // autoincrement ID - VirtualStorage string // virtual storage - TargetNode, SourceNode models.Node // which node to replicate to? - RelativePath string // source for replication - State JobState - Attempts int - Params Params // additional information required to run the job - CorrelationID string // from original request - CreatedAt time.Time // when has the job been created? -} - -// Datastore is a data persistence abstraction for all of Praefect's -// persistence needs -type Datastore struct { - ReplicasDatastore - ReplicationEventQueue -} - -// ReplicasDatastore manages accessing and setting which secondary replicas -// backup a repository -type ReplicasDatastore interface { - GetPrimary(virtualStorage string) (models.Node, error) - - GetSecondaries(virtualStorage string) ([]models.Node, error) - - GetReplicas(relativePath string) ([]models.Node, error) - - GetStorageNode(nodeStorage string) (models.Node, error) - - GetStorageNodes() ([]models.Node, error) - // VirtualStorages returns a list of virtual storages that are configured for this instance. - VirtualStorages() []string -} - -// MemoryDatastore is a simple datastore that isn't persisted to disk. It is -// only intended for early beta requirements and as a reference implementation -// for the eventual SQL implementation -type MemoryDatastore struct { - // storageNodes is read-only after initialization - // if modification needed there must be synchronization for concurrent access to it - storageNodes map[string]models.Node - - repositories *struct { - sync.RWMutex - m map[string]models.Repository - } - - // virtualStorages is read-only after initialization - // if modification needed there must be synchronization for concurrent access to it - virtualStorages map[string][]*models.Node -} - -// NewInMemory returns an initialized in-memory datastore -func NewInMemory(cfg config.Config) *MemoryDatastore { - m := &MemoryDatastore{ - storageNodes: map[string]models.Node{}, - repositories: &struct { - sync.RWMutex - m map[string]models.Repository - }{ - m: map[string]models.Repository{}, - }, - virtualStorages: map[string][]*models.Node{}, - } - - for _, virtualStorage := range cfg.VirtualStorages { - m.virtualStorages[virtualStorage.Name] = virtualStorage.Nodes - - for _, node := range virtualStorage.Nodes { - // TODO: if there is two nodes with same storage name defined for different virtual storages - // only one definition will be used: https://gitlab.com/gitlab-org/gitaly/-/issues/2613 - if _, ok := m.storageNodes[node.Storage]; ok { - continue - } - m.storageNodes[node.Storage] = *node - } - } - - return m -} - -// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it -var ErrNoPrimaryForStorage = errors.New("no primary for storage") - -// GetPrimary returns the primary configured in the config file -func (md *MemoryDatastore) GetPrimary(virtualStorage string) (models.Node, error) { - for _, node := range md.virtualStorages[virtualStorage] { - if node.DefaultPrimary { - return *node, nil - } - } - - return models.Node{}, ErrNoPrimaryForStorage -} - -// GetSecondaries gets the secondary nodes associated with a virtual storage -func (md *MemoryDatastore) GetSecondaries(virtualStorage string) ([]models.Node, error) { - var secondaries []models.Node - - for _, node := range md.virtualStorages[virtualStorage] { - if !node.DefaultPrimary { - secondaries = append(secondaries, *node) - } - } - - return secondaries, nil -} - -// GetReplicas gets the secondaries for a repository based on the relative path -func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, error) { - md.repositories.RLock() - defer md.repositories.RUnlock() - - repository, ok := md.repositories.m[relativePath] - if !ok { - return nil, errors.New("repository not found") - } - - // to prevent possible modification of element of the slice - copied := repository.Clone() - return copied.Replicas, nil -} - -// GetStorageNode gets all storage nodes -func (md *MemoryDatastore) GetStorageNode(nodeStorage string) (models.Node, error) { - node, ok := md.storageNodes[nodeStorage] - if !ok { - return models.Node{}, errors.New("node not found") - } - - return node, nil -} - -// GetStorageNodes gets all storage nodes -func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) { - var storageNodes []models.Node - for _, storageNode := range md.storageNodes { - storageNodes = append(storageNodes, storageNode) - } - - return storageNodes, nil -} - -// VirtualStorages returns list of virtual storages configured to be supported. -func (md *MemoryDatastore) VirtualStorages() []string { - vs := make([]string, 0, len(md.virtualStorages)) - for name := range md.virtualStorages { - vs = append(vs, name) - } - return vs -} diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 54d195483..a8db569ce 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -90,12 +90,12 @@ func assertPrimariesExist(t testing.TB, conf config.Config) { // config.Nodes. There must be a 1-to-1 mapping between backend server and // configured storage node. // requires there to be only 1 virtual storage -func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) { +func runPraefectServerWithMock(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) { r, err := protoregistry.New(mustLoadProtoReg(t)) require.NoError(t, err) return runPraefectServer(t, conf, buildOptions{ - withDatastore: ds, + withQueue: queue, withBackends: withMockBackends(t, backends), withAnnotations: r, }) @@ -119,7 +119,7 @@ func (nullNodeMgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPa } type buildOptions struct { - withDatastore datastore.Datastore + withQueue datastore.ReplicationEventQueue withTxMgr *transactions.Manager withBackends func([]*config.VirtualStorage) []testhelper.Cleanup withAnnotations *protoregistry.Registry @@ -184,37 +184,29 @@ func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhel } func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), - } - - return runPraefectServerWithGitalyWithDatastore(t, conf, ds) + return runPraefectServerWithGitalyWithDatastore(t, conf, defaultQueue(conf)) } // runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes // requires exactly 1 virtual storage -func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, ds datastore.Datastore) (*grpc.ClientConn, *Server, testhelper.Cleanup) { +func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, queue datastore.ReplicationEventQueue) (*grpc.ClientConn, *Server, testhelper.Cleanup) { return runPraefectServer(t, conf, buildOptions{ - withDatastore: ds, - withTxMgr: transactions.NewManager(), - withBackends: withRealGitalyShared(t), + withQueue: queue, + withTxMgr: transactions.NewManager(), + withBackends: withRealGitalyShared(t), }) } -func defaultDatastore(conf config.Config) datastore.Datastore { - return datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), - } +func defaultQueue(conf config.Config) datastore.ReplicationEventQueue { + return datastore.NewMemoryReplicationEventQueue(conf) } func defaultTxMgr() *transactions.Manager { return transactions.NewManager() } -func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) nodes.Manager { - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, promtest.NewMockHistogramVec()) +func defaultNodeMgr(t testing.TB, conf config.Config, queue datastore.ReplicationEventQueue) nodes.Manager { + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) return nodeMgr @@ -225,8 +217,8 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp var cleanups []testhelper.Cleanup - if opt.withDatastore == (datastore.Datastore{}) { - opt.withDatastore = defaultDatastore(conf) + if opt.withQueue == nil { + opt.withQueue = defaultQueue(conf) } if opt.withTxMgr == nil { opt.withTxMgr = defaultTxMgr() @@ -241,12 +233,11 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withLogger = log.Default() } if opt.withNodeMgr == nil { - opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withDatastore) + opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withQueue) } coordinator := NewCoordinator( - opt.withLogger, - opt.withDatastore, + opt.withQueue, opt.withNodeMgr, opt.withTxMgr, conf, @@ -257,7 +248,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp replmgr := NewReplMgr( opt.withLogger, conf.VirtualStorageNames(), - opt.withDatastore, + opt.withQueue, opt.withNodeMgr, WithQueueMetric(&promtest.MockGauge{}), ) @@ -269,7 +260,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp errQ := make(chan error) ctx, cancel := testhelper.Context() - prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withDatastore) + prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withQueue) go func() { errQ <- prf.Serve(listener, false) }() replmgr.ProcessBacklog(ctx, noopBackoffFunc) diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index d9d79ec2f..91f2c1d97 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -79,7 +79,7 @@ type Mgr struct { // strategies is a map of strategies keyed on virtual storage name strategies map[string]leaderElectionStrategy db *sql.DB - ds datastore.Datastore + queue datastore.ReplicationEventQueue } // leaderElectionStrategy defines the interface by which primary and @@ -98,7 +98,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy") const dialTimeout = 10 * time.Second // NewManager creates a new NodeMgr based on virtual storage configs -func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { +func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, queue datastore.ReplicationEventQueue, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages)) ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) @@ -148,7 +148,7 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Dat db: db, failoverEnabled: c.Failover.Enabled, strategies: strategies, - ds: ds, + queue: queue, }, nil } @@ -206,7 +206,7 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st var storages []string if featureflag.IsEnabled(ctx, featureflag.DistributedReads) { - if storages, err = n.ds.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil { + if storages, err = n.queue.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil { // this is recoverable error - proceed with primary node ctxlogrus.Extract(ctx). WithError(err). diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index 047d1c4b6..e2b318a59 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -86,7 +86,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) { Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"}, VirtualStorages: []*config.VirtualStorage{virtualStorage}, } - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec()) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) nm.Start(time.Millisecond, time.Millisecond) @@ -134,7 +134,7 @@ func TestPrimaryIsSecond(t *testing.T) { } mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, mockHistogram) require.NoError(t, err) shard, err := nm.GetShard("virtual-storage-0") @@ -184,7 +184,7 @@ func TestBlockingDial(t *testing.T) { healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) }() - mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec()) + mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) mgr.Start(1*time.Millisecond, 1*time.Millisecond) @@ -229,13 +229,11 @@ func TestNodeManager(t *testing.T) { Failover: config.Failover{Enabled: false}, } - ds := datastore.Datastore{} - mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, ds, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram) require.NoError(t, err) - nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, ds, mockHistogram) + nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram) require.NoError(t, err) nm.Start(1*time.Millisecond, 5*time.Second) @@ -377,46 +375,46 @@ func TestMgr_GetSyncedNode(t *testing.T) { ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.DistributedReads) - ackEvent := func(ds datastore.Datastore, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent { + ackEvent := func(queue datastore.ReplicationEventQueue, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent { event := datastore.ReplicationEvent{Job: job} - eevent, err := ds.Enqueue(ctx, event) + eevent, err := queue.Enqueue(ctx, event) require.NoError(t, err) - devents, err := ds.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2) + devents, err := queue.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2) require.NoError(t, err) require.Len(t, devents, 1) - acks, err := ds.Acknowledge(ctx, state, []uint64{devents[0].ID}) + acks, err := queue.Acknowledge(ctx, state, []uint64{devents[0].ID}) require.NoError(t, err) require.Equal(t, []uint64{devents[0].ID}, acks) return devents[0] } - verify := func(scenario func(t *testing.T, nm Manager, ds datastore.Datastore)) func(*testing.T) { - ds := datastore.Datastore{ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf)} + verify := func(scenario func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue)) func(*testing.T) { + queue := datastore.NewMemoryReplicationEventQueue(conf) - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, queue, mockHistogram) require.NoError(t, err) nm.Start(time.Duration(0), time.Hour) - return func(t *testing.T) { scenario(t, nm, ds) } + return func(t *testing.T) { scenario(t, nm, queue) } } - t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { _, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "") require.True(t, errors.Is(err, ErrVirtualStorageNotExist)) })) - t.Run("no replication events", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + t.Run("no replication events", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "no/matter") require.NoError(t, err) require.Contains(t, []string{vs0Primary, "unix://" + sockets[1]}, node.GetAddress()) })) - t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { - _, err := ds.Enqueue(ctx, datastore.ReplicationEvent{ + t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { + _, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-1", @@ -431,8 +429,8 @@ func TestMgr_GetSyncedNode(t *testing.T) { require.Equal(t, vs0Primary, node.GetAddress()) })) - t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { - vs0Event, err := ds.Enqueue(ctx, datastore.ReplicationEvent{ + t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { + vs0Event, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-1", @@ -442,7 +440,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { }) require.NoError(t, err) - vs0Events, err := ds.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500) + vs0Events, err := queue.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500) require.NoError(t, err) require.Len(t, vs0Events, 1) @@ -451,8 +449,8 @@ func TestMgr_GetSyncedNode(t *testing.T) { require.Equal(t, vs0Primary, node.GetAddress()) })) - t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { - vs0Event := ackEvent(ds, datastore.ReplicationJob{ + t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { + vs0Event := ackEvent(queue, datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", @@ -464,15 +462,15 @@ func TestMgr_GetSyncedNode(t *testing.T) { require.Equal(t, vs0Primary, node.GetAddress()) })) - t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { - vsEvent0 := ackEvent(ds, datastore.ReplicationJob{ + t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { + vsEvent0 := ackEvent(queue, datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", VirtualStorage: "virtual-storage-0", }, datastore.JobStateCompleted) - vsEvent1, err := ds.Enqueue(ctx, datastore.ReplicationEvent{ + vsEvent1, err := queue.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ RelativePath: vsEvent0.Job.RelativePath, TargetNodeStorage: vsEvent0.Job.TargetNodeStorage, @@ -487,15 +485,15 @@ func TestMgr_GetSyncedNode(t *testing.T) { require.Equal(t, vs0Primary, node.GetAddress()) })) - t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { - vs0Event := ackEvent(ds, datastore.ReplicationJob{ + t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { + vs0Event := ackEvent(queue, datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", VirtualStorage: "virtual-storage-0", }, datastore.JobStateDead) - ackEvent(ds, datastore.ReplicationJob{ + ackEvent(queue, datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "gitaly-1", @@ -507,22 +505,22 @@ func TestMgr_GetSyncedNode(t *testing.T) { require.Equal(t, vs0Primary, node.GetAddress()) })) - t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { - ackEvent(ds, datastore.ReplicationJob{ + t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, queue datastore.ReplicationEventQueue) { + ackEvent(queue, datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-1", SourceNodeStorage: "gitaly-0", VirtualStorage: "virtual-storage-0", }, datastore.JobStateCompleted) - ackEvent(ds, datastore.ReplicationJob{ + ackEvent(queue, datastore.ReplicationJob{ RelativePath: "path/1", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "gitaly-1", VirtualStorage: "virtual-storage-1", }, datastore.JobStateCompleted) - vs1Event := ackEvent(ds, datastore.ReplicationJob{ + vs1Event := ackEvent(queue, datastore.ReplicationJob{ RelativePath: "path/2", TargetNodeStorage: "gitaly-2", SourceNodeStorage: "gitaly-1", diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 721edab29..0668a59f8 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -412,7 +412,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora eventIDsByState := map[datastore.JobState][]uint64{} for _, event := range events { - if err := r.processReplJob(ctx, event, shard, target.GetConnection()); err != nil { + if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil { logger.WithFields(logrus.Fields{ logWithReplJobID: event.ID, logWithReplVirtual: event.Job.VirtualStorage, @@ -462,7 +462,7 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora } } -func (r ReplMgr) processReplJob(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error { +func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error { source, err := shard.GetNode(event.Job.SourceNodeStorage) if err != nil { return fmt.Errorf("get source node: %w", err) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 46053e7d1..fbfcb96ab 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -89,10 +89,7 @@ func TestProcessReplicationJob(t *testing.T) { }, } - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), - } + queue := datastore.NewMemoryReplicationEventQueue(conf) // create object pool on the source objectPoolPath := testhelper.NewTestObjectPoolName(t) @@ -123,23 +120,28 @@ func TestProcessReplicationJob(t *testing.T) { }) require.NoError(t, err) - primary, err := ds.GetPrimary(conf.VirtualStorages[0].Name) + entry := testhelper.DiscardTestEntry(t) + + nodeMgr, err := nodes.NewManager(entry, conf, nil, queue, promtest.NewMockHistogramVec()) require.NoError(t, err) - secondaries, err := ds.GetSecondaries(conf.VirtualStorages[0].Name) + nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + + shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) require.NoError(t, err) + require.Len(t, shard.Secondaries, 1) var events []datastore.ReplicationEvent - for _, secondary := range secondaries { + for _, secondary := range shard.Secondaries { events = append(events, datastore.ReplicationEvent{ - State: datastore.JobStateReady, - Attempt: 3, Job: datastore.ReplicationJob{ Change: datastore.UpdateRepo, - TargetNodeStorage: secondary.Storage, - SourceNodeStorage: primary.Storage, + TargetNodeStorage: secondary.GetStorage(), + SourceNodeStorage: shard.Primary.GetStorage(), RelativePath: testRepo.GetRelativePath(), }, - Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"}, + State: datastore.JobStateReady, + Attempt: 3, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: "correlation-id"}, }) } require.Len(t, events, 1) @@ -149,13 +151,8 @@ func TestProcessReplicationJob(t *testing.T) { }) var replicator defaultReplicator - entry := testhelper.DiscardTestEntry(t) replicator.log = entry - nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) - require.NoError(t, err) - nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) - var mockReplicationGauge promtest.MockGauge var mockReplicationLatencyHistogramVec promtest.MockHistogramVec var mockReplicationDelayHistogramVec promtest.MockHistogramVec @@ -163,7 +160,7 @@ func TestProcessReplicationJob(t *testing.T) { replMgr := NewReplMgr( testhelper.DiscardTestEntry(t), conf.VirtualStorageNames(), - ds, + queue, nodeMgr, WithLatencyMetric(&mockReplicationLatencyHistogramVec), WithDelayMetric(&mockReplicationDelayHistogramVec), @@ -172,11 +169,7 @@ func TestProcessReplicationJob(t *testing.T) { replMgr.replicator = replicator - shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) - require.NoError(t, err) - require.Len(t, shard.Secondaries, 1) - - replMgr.processReplJob(ctx, events[0], shard, shard.Secondaries[0].GetConnection()) + require.NoError(t, replMgr.processReplicationEvent(ctx, events[0], shard, shard.Secondaries[0].GetConnection())) relativeRepoPath, err := filepath.Rel(testhelper.GitlabTestStoragePath(), testRepoPath) require.NoError(t, err) @@ -219,21 +212,18 @@ func TestPropagateReplicationJob(t *testing.T) { }, } - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), - } + queue := datastore.NewMemoryReplicationEventQueue(conf) logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queue, promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) txMgr := transactions.NewManager() - coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) + coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, protoregistry.GitalyProtoPreregistered) - replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr) + replmgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queue, nodeMgr) prf := NewServer( coordinator.StreamDirector, @@ -245,7 +235,7 @@ func TestPropagateReplicationJob(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - prf.RegisterServices(nodeMgr, txMgr, conf, ds) + prf.RegisterServices(nodeMgr, txMgr, conf, queue) go prf.Serve(listener, false) defer prf.Stop() @@ -496,11 +486,6 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { return ackIDs, err }) - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: queueInterceptor, - } - // this job exists to verify that replication works okJob := datastore.ReplicationJob{ Change: datastore.UpdateRepo, @@ -509,23 +494,23 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { SourceNodeStorage: primary.Storage, VirtualStorage: "praefect", } - event1, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob}) + event1, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob}) require.NoError(t, err) require.Equal(t, uint64(1), event1.ID) // this job checks flow for replication event that fails failJob := okJob failJob.RelativePath = "invalid path to fail the job" - event2, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob}) + event2, err := queueInterceptor.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob}) require.NoError(t, err) require.Equal(t, uint64(2), event2.ID) logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr) + replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) replMgr.ProcessBacklog(ctx, noopBackoffFunc) select { @@ -599,11 +584,6 @@ func TestProcessBacklog_Success(t *testing.T) { return ackIDs, err }) - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: queueInterceptor, - } - // Update replication job eventType1 := datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ @@ -615,10 +595,10 @@ func TestProcessBacklog_Success(t *testing.T) { }, } - _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1) + _, err = queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) - _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1) + _, err = queueInterceptor.Enqueue(ctx, eventType1) require.NoError(t, err) renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1") @@ -639,7 +619,7 @@ func TestProcessBacklog_Success(t *testing.T) { }, } - _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType2) + _, err = queueInterceptor.Enqueue(ctx, eventType2) require.NoError(t, err) // Rename replication job @@ -655,15 +635,15 @@ func TestProcessBacklog_Success(t *testing.T) { } require.NoError(t, err) - _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType3) + _, err = queueInterceptor.Enqueue(ctx, eventType3) require.NoError(t, err) logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, queueInterceptor, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), ds, nodeMgr) + replMgr := NewReplMgr(logEntry, conf.VirtualStorageNames(), queueInterceptor, nodeMgr) replMgr.ProcessBacklog(ctx, noopBackoffFunc) select { @@ -725,7 +705,7 @@ func TestProcessBacklog_ReplicatesToReadOnlyPrimary(t *testing.T) { replMgr := NewReplMgr( testhelper.DiscardTestEntry(t), conf.VirtualStorageNames(), - datastore.Datastore{datastore.NewInMemory(conf), queue}, + queue, &mockNodeManager{ GetShardFunc: func(vs string) (nodes.Shard, error) { require.Equal(t, virtualStorage, vs) diff --git a/internal/praefect/server.go b/internal/praefect/server.go index c56eda44c..229d30200 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -125,10 +125,10 @@ func (srv *Server) Serve(l net.Listener, secure bool) error { } // RegisterServices will register any services praefect needs to handle rpcs on its own -func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, ds datastore.Datastore) { +func (srv *Server) RegisterServices(nm nodes.Manager, tm *transactions.Manager, conf config.Config, queue datastore.ReplicationEventQueue) { // ServerServiceServer is necessary for the ServerInfo RPC gitalypb.RegisterServerServiceServer(srv.s, server.NewServer(conf, nm)) - gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, ds)) + gitalypb.RegisterPraefectInfoServiceServer(srv.s, info.NewServer(nm, conf, queue)) gitalypb.RegisterRefTransactionServer(srv.s, transaction.NewServer(tm)) healthpb.RegisterHealthServer(srv.s, health.NewServer()) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 2ebbb29e2..9feaf3d7a 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -49,7 +49,7 @@ func TestServerRouteServerAccessor(t *testing.T) { } ) - cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends) + cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends) defer cleanup() cli := mock.NewSimpleServiceClient(cc) @@ -130,7 +130,7 @@ func TestGitalyServerInfo(t *testing.T) { conf.VirtualStorages[0].Nodes[1].Storage: &mockSvc{}, } - cc, _, cleanup := runPraefectServerWithMock(t, conf, datastore.Datastore{}, backends) + cc, _, cleanup := runPraefectServerWithMock(t, conf, nil, backends) defer cleanup() client := gitalypb.NewServerServiceClient(cc) @@ -443,10 +443,6 @@ func TestRepoRemoval(t *testing.T) { // TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports // graceful shutdown, we can remove this code that waits for jobs to be complete queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: queueInterceptor, - } jobsDoneCh := make(chan struct{}, 2) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { @@ -457,7 +453,7 @@ func TestRepoRemoval(t *testing.T) { return queue.Acknowledge(ctx, state, ids) }) - cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds) + cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, queueInterceptor) defer cleanup() ctx, cancel := testhelper.Context() @@ -575,12 +571,7 @@ func TestRepoRename(t *testing.T) { return queue.Acknowledge(ctx, state, ids) }) - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: evq, - } - - cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds) + cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, evq) defer cleanup() ctx, cancel := testhelper.Context() diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go index d336144e9..24510db79 100644 --- a/internal/praefect/service/info/consistencycheck.go +++ b/internal/praefect/service/info/consistencycheck.go @@ -169,7 +169,7 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ return nil } -func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp *gitalypb.ConsistencyCheckResponse) error { +func scheduleReplication(ctx context.Context, csr checksumResult, q datastore.ReplicationEventQueue, resp *gitalypb.ConsistencyCheckResponse) error { event, err := q.Enqueue(ctx, datastore.ReplicationEvent{ Job: datastore.ReplicationJob{ Change: datastore.UpdateRepo, @@ -190,7 +190,7 @@ func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp return nil } -func ensureConsistency(ctx context.Context, disableReconcile bool, checksumResultQ <-chan checksumResult, q Queue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error { +func ensureConsistency(ctx context.Context, disableReconcile bool, checksumResultQ <-chan checksumResult, q datastore.ReplicationEventQueue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error { for { var csr checksumResult select { diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index f3920fefa..f46c2fd03 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -3,7 +3,6 @@ package info import ( "context" "errors" - "time" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" @@ -12,26 +11,16 @@ import ( "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -// Queue is a subset of the datastore.ReplicationEventQueue functionality needed by this service -type Queue interface { - Enqueue(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error) - CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) -} - -// compile time assertion that Queue is satisfied by -// datastore.ReplicationEventQueue -var _ Queue = (datastore.ReplicationEventQueue)(nil) - // Server is a InfoService server type Server struct { gitalypb.UnimplementedPraefectInfoServiceServer nodeMgr nodes.Manager conf config.Config - queue Queue + queue datastore.ReplicationEventQueue } // NewServer creates a new instance of a grpc InfoServiceServer -func NewServer(nodeMgr nodes.Manager, conf config.Config, queue Queue) gitalypb.PraefectInfoServiceServer { +func NewServer(nodeMgr nodes.Manager, conf config.Config, queue datastore.ReplicationEventQueue) gitalypb.PraefectInfoServiceServer { s := &Server{ nodeMgr: nodeMgr, conf: conf, |