diff options
author | John Cai <jcai@gitlab.com> | 2019-11-14 21:57:38 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-11-20 04:30:36 +0300 |
commit | c1e7611d48b15cb909a7fb56615d59613f67a85b (patch) | |
tree | 6a9a71e6a2351113f6f5f4ad4039f858c33fed0f | |
parent | 8925c3db0a9ee56f91674fcfb20499a63e0f47ea (diff) |
Refactor datastore to its own package
-rw-r--r-- | changelogs/unreleased/jc-move-datastore-to-package.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 7 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 7 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 19 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 21 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go (renamed from internal/praefect/datastore.go) | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore_test.go (renamed from internal/praefect/datastore_test.go) | 4 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 57 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 27 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 49 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 2 |
11 files changed, 114 insertions, 92 deletions
diff --git a/changelogs/unreleased/jc-move-datastore-to-package.yml b/changelogs/unreleased/jc-move-datastore-to-package.yml new file mode 100644 index 000000000..95f74e623 --- /dev/null +++ b/changelogs/unreleased/jc-move-datastore-to-package.yml @@ -0,0 +1,5 @@ +--- +title: Refactor datastore to its own package +merge_request: 1627 +author: +type: other diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index b45a17c23..b17394a53 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -21,6 +21,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" @@ -108,9 +109,9 @@ func run(listeners []net.Listener, conf config.Config) error { var ( // top level server dependencies - datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) - repl = praefect.NewReplMgr("default", logger, datastore, clientConnections) + ds = datastore.NewInMemory(conf) + coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr("default", logger, ds, clientConnections) srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf) // signal related signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index cc3e5c39a..b426910f8 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" @@ -183,14 +184,14 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func } logEntry := log.Default() - datastore := NewMemoryDatastore(conf) + ds := datastore.NewInMemory(conf) clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-0", backend, backendToken) - coordinator := NewCoordinator(logEntry, datastore, clientConnections, conf, fd) + coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd) - replMgr := NewReplMgr("praefect-internal-0", logEntry, datastore, clientConnections) + replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections) srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index e4950f203..3985a13ea 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -36,20 +37,20 @@ type Coordinator struct { log *logrus.Entry failoverMutex sync.RWMutex - datastore Datastore + datastore datastore.Datastore registry *protoregistry.Registry conf config.Config } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) return &Coordinator{ log: l, - datastore: datastore, + datastore: ds, registry: registry, connections: clientConnections, conf: conf, @@ -158,9 +159,9 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo requestFinalizer := noopRequestFinalizer if mi.Operation == protoregistry.OpMutator { - change := UpdateRepo + change := datastore.UpdateRepo if isDestructive(method) { - change = DeleteRepo + change = datastore.DeleteRepo } if requestFinalizer, err = c.createReplicaJobs(targetRepo, change); err != nil { @@ -178,7 +179,7 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) if err != nil { - if err != ErrPrimaryNotSet { + if err != datastore.ErrPrimaryNotSet { return nil, err } // if there are no primaries for this repository, pick one @@ -231,7 +232,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi return m, nil } -func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change ChangeType) (func(), error) { +func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change datastore.ChangeType) (func(), error) { jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, change) if err != nil { return nil, err @@ -239,8 +240,8 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change return func() { for _, jobID := range jobIDs { - if err := c.datastore.UpdateReplJob(jobID, JobStateReady); err != nil { - c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", JobStateReady) + if err := c.datastore.UpdateReplJob(jobID, datastore.JobStateReady); err != nil { + c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", datastore.JobStateReady) } } }, nil diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index b220c7b6f..76a0a84d3 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -41,7 +42,7 @@ func TestStreamDirector(t *testing.T) { Storage: "praefect-internal-2", }}, } - datastore := NewMemoryDatastore(conf) + ds := datastore.NewInMemory(conf) targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -55,7 +56,7 @@ func TestStreamDirector(t *testing.T) { clientConnections := conn.NewClientConnections() clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token") - coordinator := NewCoordinator(log.Default(), datastore, clientConnections, conf) + coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf) require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...)) frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{ @@ -86,21 +87,21 @@ func TestStreamDirector(t *testing.T) { require.NoError(t, err) require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name") - jobs, err := datastore.GetJobs(JobStatePending, 1, 10) + jobs, err := ds.GetJobs(datastore.JobStatePending, 1, 10) require.NoError(t, err) require.Len(t, jobs, 1) - targetNode, err := datastore.GetStorageNode(1) + targetNode, err := ds.GetStorageNode(1) require.NoError(t, err) - sourceNode, err := datastore.GetStorageNode(0) + sourceNode, err := ds.GetStorageNode(0) require.NoError(t, err) - expectedJob := ReplJob{ - Change: UpdateRepo, + expectedJob := datastore.ReplJob{ + Change: datastore.UpdateRepo, ID: 1, TargetNode: targetNode, SourceNode: sourceNode, - State: JobStatePending, + State: datastore.JobStatePending, Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}}, } @@ -108,11 +109,11 @@ func TestStreamDirector(t *testing.T) { jobUpdateFunc() - jobs, err = coordinator.datastore.GetJobs(JobStateReady, 1, 10) + jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, 1, 10) require.NoError(t, err) require.Len(t, jobs, 1) - expectedJob.State = JobStateReady + expectedJob.State = datastore.JobStateReady require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady") } diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore/datastore.go index d100d8bcc..8bb18cd07 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -1,9 +1,9 @@ -// Package praefect provides data models and datastore persistence abstractions +// Package datastore provides data models and datastore persistence abstractions // for tracking the state of repository replicas. // // See original design discussion: // https://gitlab.com/gitlab-org/gitaly/issues/1495 -package praefect +package datastore import ( "errors" @@ -143,8 +143,8 @@ type MemoryDatastore struct { } } -// NewMemoryDatastore returns an initialized in-memory datastore -func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { +// NewInMemory returns an initialized in-memory datastore +func NewInMemory(cfg config.Config) *MemoryDatastore { m := &MemoryDatastore{ storageNodes: &struct { sync.RWMutex diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore/datastore_test.go index 4c8ce6dac..1be1e1de0 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore/datastore_test.go @@ -1,4 +1,4 @@ -package praefect +package datastore import ( "testing" @@ -112,7 +112,7 @@ var operations = []struct { // TODO: add SQL datastore flavor var flavors = map[string]func() Datastore{ "in-memory-datastore": func() Datastore { - return NewMemoryDatastore(config.Config{ + return NewInMemory(config.Config{ Nodes: []*models.Node{&stor1, &stor2}, }) }, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 0637f40de..f1f8778b5 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/log" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" @@ -67,11 +68,10 @@ func testConfig(backends int) config.Config { // setupServer wires all praefect dependencies together via dependency // injection -func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*MemoryDatastore, *conn.ClientConnections, *Server) { +func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) { var ( - datastore = NewMemoryDatastore(conf) - clientCC = conn.NewClientConnections() - coordinator = NewCoordinator(l, datastore, clientCC, conf, fds...) + ds = datastore.NewInMemory(conf) + coordinator = NewCoordinator(l, ds, clientCC, conf, fds...) ) var defaultNode *models.Node @@ -85,7 +85,7 @@ func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descr replmgr := NewReplMgr( defaultNode.Storage, l, - datastore, + ds, clientCC, ) server := NewServer( @@ -97,7 +97,7 @@ func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descr conf, ) - return datastore, clientCC, server + return ds, server } // runPraefectServer runs a praefect server with the provided mock servers. @@ -105,25 +105,26 @@ func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descr // config.Nodes. There must be a 1-to-1 mapping between backend server and // configured storage node. func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[int]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) { - datastore, clientCC, prf := setupServer(t, conf, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)}) - - require.Equal(t, len(backends), len(conf.Nodes), - "mock server count doesn't match config nodes") - + clientCC := conn.NewClientConnections() var cleanups []testhelper.Cleanup - for id, nodeStorage := range datastore.storageNodes.m { - backend, ok := backends[id] - require.True(t, ok, "missing backend server for node %d", id) + for i, node := range conf.Nodes { + backend, ok := backends[i] + require.True(t, ok, "missing backend server for node %d", i) - backendAddr, cleanup := newMockDownstream(t, nodeStorage.Token, backend) + backendAddr, cleanup := newMockDownstream(t, node.Token, backend) cleanups = append(cleanups, cleanup) - clientCC.RegisterNode(nodeStorage.Storage, backendAddr, nodeStorage.Token) - nodeStorage.Address = backendAddr - datastore.storageNodes.m[id] = nodeStorage + clientCC.RegisterNode(node.Storage, backendAddr, node.Token) + node.Address = backendAddr + conf.Nodes[i] = node } + _, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)}) + + require.Equal(t, len(backends), len(conf.Nodes), + "mock server count doesn't match config nodes") + listener, port := listenAvailPort(t) t.Logf("praefect listening on port %d", port) @@ -154,27 +155,27 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[in // runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { - datastore := NewMemoryDatastore(conf) - logEntry := log.Default() clientCC := conn.NewClientConnections() - var cleanups []testhelper.Cleanup - for id, nodeStorage := range datastore.storageNodes.m { - _, backend, cleanup := runInternalGitalyServer(t, nodeStorage.Token) + for i, node := range conf.Nodes { + _, backendAddr, cleanup := runInternalGitalyServer(t, node.Token) cleanups = append(cleanups, cleanup) - clientCC.RegisterNode(nodeStorage.Storage, backend, nodeStorage.Token) - nodeStorage.Address = backend - datastore.storageNodes.m[id] = nodeStorage + clientCC.RegisterNode(node.Storage, backendAddr, node.Token) + node.Address = backendAddr + conf.Nodes[i] = node } - coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) + ds := datastore.NewInMemory(conf) + logEntry := log.Default() + + coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...) replmgr := NewReplMgr( "", logEntry, - datastore, + ds, clientCC, ) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 57cbf6a30..e12aab139 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -55,16 +56,16 @@ func init() { // Replicator performs the actual replication logic between two nodes type Replicator interface { // Replicate propagates changes from the source to the target - Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error + Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error // Destroy will remove the target repo on the specified target connection - Destroy(ctx context.Context, job ReplJob, target *grpc.ClientConn) error + Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error } type defaultReplicator struct { log *logrus.Entry } -func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error { repository := &gitalypb.Repository{ StorageName: job.TargetNode.Storage, RelativePath: job.Repository.RelativePath, @@ -114,7 +115,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC return nil } -func (dr defaultReplicator) Destroy(ctx context.Context, job ReplJob, targetCC *grpc.ClientConn) error { +func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error { targetRepo := &gitalypb.Repository{ StorageName: job.TargetNode.Storage, RelativePath: job.Repository.RelativePath, @@ -167,7 +168,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient, // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Entry - datastore Datastore + datastore datastore.Datastore clientConnections *conn.ClientConnections targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic @@ -181,7 +182,7 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Entry, datastore Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, datastore: datastore, @@ -225,7 +226,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository return nil } - id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath, UpdateRepo) + id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath, datastore.UpdateRepo) if err != nil { return err } @@ -254,7 +255,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { } for _, node := range nodes { - jobs, err := r.datastore.GetJobs(JobStateReady, node.ID, 10) + jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.ID, 10) if err != nil { return err } @@ -292,13 +293,13 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { // is a crutch in this situation. Ideally, we need to update state somewhere // with information regarding the replication failure. See follow up issue: // https://gitlab.com/gitlab-org/gitaly/issues/2138 -func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) { +func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) { l := r.log. WithField(logWithReplJobID, job.ID). WithField(logWithReplSource, job.SourceNode). WithField(logWithReplTarget, job.TargetNode) - if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateInProgress); err != nil { l.WithError(err).Error("unable to update replication job to in progress") return } @@ -326,9 +327,9 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) { defer decReplicationJobsInFlight() switch job.Change { - case UpdateRepo: + case datastore.UpdateRepo: err = r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC) - case DeleteRepo: + case datastore.DeleteRepo: err = r.replicator.Destroy(injectedCtx, job, targetCC) default: err = fmt.Errorf("unknown replication change type encountered: %d", job.Change) @@ -341,7 +342,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) { replDuration := time.Since(replStart) recordReplicationLatency(float64(replDuration / time.Millisecond)) - if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil { + if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateComplete); err != nil { l.WithError(err).Error("error when updating replication job status to complete") } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 44eb7873e..39623c174 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -6,7 +6,6 @@ import ( "net" "os" "path/filepath" - "sync" "testing" "github.com/stretchr/testify/require" @@ -15,7 +14,9 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" @@ -69,25 +70,35 @@ func TestProceessReplicationJob(t *testing.T) { }, ) - job := jobRecord{state: JobStateReady} - - m := &MemoryDatastore{ - jobs: &struct { - sync.RWMutex - records map[uint64]jobRecord // all jobs indexed by ID - }{ - records: map[uint64]jobRecord{1: job}, + config := config.Config{ + Nodes: []*models.Node{ + &models.Node{ + ID: 0, + Storage: "default", + Address: srvSocketPath, + Token: gitaly_config.Config.Auth.Token, + DefaultPrimary: true, + }, + &models.Node{ + ID: 1, + Storage: backupStorageName, + Address: srvSocketPath, + Token: gitaly_config.Config.Auth.Token, + }, }, } - replJob := ReplJob{ - Change: UpdateRepo, - ID: 1, - TargetNode: models.Node{Storage: backupStorageName, Address: srvSocketPath}, - SourceNode: models.Node{Storage: "default", Address: srvSocketPath, Token: testhelper.RepositoryAuthToken}, - Repository: models.Repository{Primary: models.Node{Storage: "default", Address: srvSocketPath}, RelativePath: testRepo.GetRelativePath()}, - State: JobStateReady, - } + ds := datastore.NewInMemory(config) + + ds.SetPrimary(testRepo.GetRelativePath(), 0) + ds.AddReplica(testRepo.GetRelativePath(), 1) + + _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), datastore.UpdateRepo) + require.NoError(t, err) + + jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, 1, 1) + require.NoError(t, err) + require.Len(t, jobs, 1) ctx, cancel := testhelper.Context() defer cancel() @@ -105,12 +116,12 @@ func TestProceessReplicationJob(t *testing.T) { replMgr := &ReplMgr{ log: gitaly_log.Default(), - datastore: m, + datastore: ds, clientConnections: clientCC, replicator: replicator, } - replMgr.processReplJob(ctx, replJob) + replMgr.processReplJob(ctx, jobs[0]) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 956d1158f..7f1b9d767 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -170,7 +170,7 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook := test.NewNullLogger() - setupServer(t, conf, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning for _, entry := range hook.Entries { if strings.Contains(entry.Message, "more than one backend node") { |