Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-11-14 21:57:38 +0300
committerJohn Cai <jcai@gitlab.com>2019-11-20 04:30:36 +0300
commitc1e7611d48b15cb909a7fb56615d59613f67a85b (patch)
tree6a9a71e6a2351113f6f5f4ad4039f858c33fed0f
parent8925c3db0a9ee56f91674fcfb20499a63e0f47ea (diff)
Refactor datastore to its own package
-rw-r--r--changelogs/unreleased/jc-move-datastore-to-package.yml5
-rw-r--r--cmd/praefect/main.go7
-rw-r--r--internal/praefect/auth_test.go7
-rw-r--r--internal/praefect/coordinator.go19
-rw-r--r--internal/praefect/coordinator_test.go21
-rw-r--r--internal/praefect/datastore/datastore.go (renamed from internal/praefect/datastore.go)8
-rw-r--r--internal/praefect/datastore/datastore_test.go (renamed from internal/praefect/datastore_test.go)4
-rw-r--r--internal/praefect/helper_test.go57
-rw-r--r--internal/praefect/replicator.go27
-rw-r--r--internal/praefect/replicator_test.go49
-rw-r--r--internal/praefect/server_test.go2
11 files changed, 114 insertions, 92 deletions
diff --git a/changelogs/unreleased/jc-move-datastore-to-package.yml b/changelogs/unreleased/jc-move-datastore-to-package.yml
new file mode 100644
index 000000000..95f74e623
--- /dev/null
+++ b/changelogs/unreleased/jc-move-datastore-to-package.yml
@@ -0,0 +1,5 @@
+---
+title: Refactor datastore to its own package
+merge_request: 1627
+author:
+type: other
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index b45a17c23..b17394a53 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -21,6 +21,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/labkit/tracing"
@@ -108,9 +109,9 @@ func run(listeners []net.Listener, conf config.Config) error {
var (
// top level server dependencies
- datastore = praefect.NewMemoryDatastore(conf)
- coordinator = praefect.NewCoordinator(logger, datastore, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
- repl = praefect.NewReplMgr("default", logger, datastore, clientConnections)
+ ds = datastore.NewInMemory(conf)
+ coordinator = praefect.NewCoordinator(logger, ds, clientConnections, conf, protoregistry.GitalyProtoFileDescriptors...)
+ repl = praefect.NewReplMgr("default", logger, ds, clientConnections)
srv = praefect.NewServer(coordinator, repl, nil, logger, clientConnections, conf)
// signal related
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index cc3e5c39a..b426910f8 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
@@ -183,14 +184,14 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
}
logEntry := log.Default()
- datastore := NewMemoryDatastore(conf)
+ ds := datastore.NewInMemory(conf)
clientConnections := conn.NewClientConnections()
clientConnections.RegisterNode("praefect-internal-0", backend, backendToken)
- coordinator := NewCoordinator(logEntry, datastore, clientConnections, conf, fd)
+ coordinator := NewCoordinator(logEntry, ds, clientConnections, conf, fd)
- replMgr := NewReplMgr("praefect-internal-0", logEntry, datastore, clientConnections)
+ replMgr := NewReplMgr("praefect-internal-0", logEntry, ds, clientConnections)
srv := NewServer(coordinator, replMgr, nil, logEntry, clientConnections, conf)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index e4950f203..3985a13ea 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -36,20 +37,20 @@ type Coordinator struct {
log *logrus.Entry
failoverMutex sync.RWMutex
- datastore Datastore
+ datastore datastore.Datastore
registry *protoregistry.Registry
conf config.Config
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Entry, datastore Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Entry, ds datastore.Datastore, clientConnections *conn.ClientConnections, conf config.Config, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
return &Coordinator{
log: l,
- datastore: datastore,
+ datastore: ds,
registry: registry,
connections: clientConnections,
conf: conf,
@@ -158,9 +159,9 @@ func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo
requestFinalizer := noopRequestFinalizer
if mi.Operation == protoregistry.OpMutator {
- change := UpdateRepo
+ change := datastore.UpdateRepo
if isDestructive(method) {
- change = DeleteRepo
+ change = datastore.DeleteRepo
}
if requestFinalizer, err = c.createReplicaJobs(targetRepo, change); err != nil {
@@ -178,7 +179,7 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git
primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
if err != nil {
- if err != ErrPrimaryNotSet {
+ if err != datastore.ErrPrimaryNotSet {
return nil, err
}
// if there are no primaries for this repository, pick one
@@ -231,7 +232,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
return m, nil
}
-func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change ChangeType) (func(), error) {
+func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change datastore.ChangeType) (func(), error) {
jobIDs, err := c.datastore.CreateReplicaReplJobs(targetRepo.RelativePath, change)
if err != nil {
return nil, err
@@ -239,8 +240,8 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository, change
return func() {
for _, jobID := range jobIDs {
- if err := c.datastore.UpdateReplJob(jobID, JobStateReady); err != nil {
- c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", JobStateReady)
+ if err := c.datastore.UpdateReplJob(jobID, datastore.JobStateReady); err != nil {
+ c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %d", datastore.JobStateReady)
}
}
}, nil
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index b220c7b6f..76a0a84d3 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -11,6 +11,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -41,7 +42,7 @@ func TestStreamDirector(t *testing.T) {
Storage: "praefect-internal-2",
}},
}
- datastore := NewMemoryDatastore(conf)
+ ds := datastore.NewInMemory(conf)
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -55,7 +56,7 @@ func TestStreamDirector(t *testing.T) {
clientConnections := conn.NewClientConnections()
clientConnections.RegisterNode("praefect-internal-1", fmt.Sprintf("tcp://%s", address), "token")
- coordinator := NewCoordinator(log.Default(), datastore, clientConnections, conf)
+ coordinator := NewCoordinator(log.Default(), ds, clientConnections, conf)
require.NoError(t, coordinator.RegisterProtos(protoregistry.GitalyProtoFileDescriptors...))
frame, err := proto.Marshal(&gitalypb.FetchIntoObjectPoolRequest{
@@ -86,21 +87,21 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name")
- jobs, err := datastore.GetJobs(JobStatePending, 1, 10)
+ jobs, err := ds.GetJobs(datastore.JobStatePending, 1, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- targetNode, err := datastore.GetStorageNode(1)
+ targetNode, err := ds.GetStorageNode(1)
require.NoError(t, err)
- sourceNode, err := datastore.GetStorageNode(0)
+ sourceNode, err := ds.GetStorageNode(0)
require.NoError(t, err)
- expectedJob := ReplJob{
- Change: UpdateRepo,
+ expectedJob := datastore.ReplJob{
+ Change: datastore.UpdateRepo,
ID: 1,
TargetNode: targetNode,
SourceNode: sourceNode,
- State: JobStatePending,
+ State: datastore.JobStatePending,
Repository: models.Repository{RelativePath: targetRepo.RelativePath, Primary: sourceNode, Replicas: []models.Node{targetNode}},
}
@@ -108,11 +109,11 @@ func TestStreamDirector(t *testing.T) {
jobUpdateFunc()
- jobs, err = coordinator.datastore.GetJobs(JobStateReady, 1, 10)
+ jobs, err = coordinator.datastore.GetJobs(datastore.JobStateReady, 1, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- expectedJob.State = JobStateReady
+ expectedJob.State = datastore.JobStateReady
require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady")
}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore/datastore.go
index d100d8bcc..8bb18cd07 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -1,9 +1,9 @@
-// Package praefect provides data models and datastore persistence abstractions
+// Package datastore provides data models and datastore persistence abstractions
// for tracking the state of repository replicas.
//
// See original design discussion:
// https://gitlab.com/gitlab-org/gitaly/issues/1495
-package praefect
+package datastore
import (
"errors"
@@ -143,8 +143,8 @@ type MemoryDatastore struct {
}
}
-// NewMemoryDatastore returns an initialized in-memory datastore
-func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
+// NewInMemory returns an initialized in-memory datastore
+func NewInMemory(cfg config.Config) *MemoryDatastore {
m := &MemoryDatastore{
storageNodes: &struct {
sync.RWMutex
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore/datastore_test.go
index 4c8ce6dac..1be1e1de0 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore/datastore_test.go
@@ -1,4 +1,4 @@
-package praefect
+package datastore
import (
"testing"
@@ -112,7 +112,7 @@ var operations = []struct {
// TODO: add SQL datastore flavor
var flavors = map[string]func() Datastore{
"in-memory-datastore": func() Datastore {
- return NewMemoryDatastore(config.Config{
+ return NewInMemory(config.Config{
Nodes: []*models.Node{&stor1, &stor2},
})
},
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 0637f40de..f1f8778b5 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -15,6 +15,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -67,11 +68,10 @@ func testConfig(backends int) config.Config {
// setupServer wires all praefect dependencies together via dependency
// injection
-func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*MemoryDatastore, *conn.ClientConnections, *Server) {
+func setupServer(t testing.TB, conf config.Config, clientCC *conn.ClientConnections, l *logrus.Entry, fds []*descriptor.FileDescriptorProto) (*datastore.MemoryDatastore, *Server) {
var (
- datastore = NewMemoryDatastore(conf)
- clientCC = conn.NewClientConnections()
- coordinator = NewCoordinator(l, datastore, clientCC, conf, fds...)
+ ds = datastore.NewInMemory(conf)
+ coordinator = NewCoordinator(l, ds, clientCC, conf, fds...)
)
var defaultNode *models.Node
@@ -85,7 +85,7 @@ func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descr
replmgr := NewReplMgr(
defaultNode.Storage,
l,
- datastore,
+ ds,
clientCC,
)
server := NewServer(
@@ -97,7 +97,7 @@ func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descr
conf,
)
- return datastore, clientCC, server
+ return ds, server
}
// runPraefectServer runs a praefect server with the provided mock servers.
@@ -105,25 +105,26 @@ func setupServer(t testing.TB, conf config.Config, l *logrus.Entry, fds []*descr
// config.Nodes. There must be a 1-to-1 mapping between backend server and
// configured storage node.
func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[int]mock.SimpleServiceServer) (mock.SimpleServiceClient, *Server, testhelper.Cleanup) {
- datastore, clientCC, prf := setupServer(t, conf, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
-
- require.Equal(t, len(backends), len(conf.Nodes),
- "mock server count doesn't match config nodes")
-
+ clientCC := conn.NewClientConnections()
var cleanups []testhelper.Cleanup
- for id, nodeStorage := range datastore.storageNodes.m {
- backend, ok := backends[id]
- require.True(t, ok, "missing backend server for node %d", id)
+ for i, node := range conf.Nodes {
+ backend, ok := backends[i]
+ require.True(t, ok, "missing backend server for node %d", i)
- backendAddr, cleanup := newMockDownstream(t, nodeStorage.Token, backend)
+ backendAddr, cleanup := newMockDownstream(t, node.Token, backend)
cleanups = append(cleanups, cleanup)
- clientCC.RegisterNode(nodeStorage.Storage, backendAddr, nodeStorage.Token)
- nodeStorage.Address = backendAddr
- datastore.storageNodes.m[id] = nodeStorage
+ clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
+ node.Address = backendAddr
+ conf.Nodes[i] = node
}
+ _, prf := setupServer(t, conf, clientCC, log.Default(), []*descriptor.FileDescriptorProto{mustLoadProtoReg(t)})
+
+ require.Equal(t, len(backends), len(conf.Nodes),
+ "mock server count doesn't match config nodes")
+
listener, port := listenAvailPort(t)
t.Logf("praefect listening on port %d", port)
@@ -154,27 +155,27 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[in
// runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
- datastore := NewMemoryDatastore(conf)
- logEntry := log.Default()
clientCC := conn.NewClientConnections()
-
var cleanups []testhelper.Cleanup
- for id, nodeStorage := range datastore.storageNodes.m {
- _, backend, cleanup := runInternalGitalyServer(t, nodeStorage.Token)
+ for i, node := range conf.Nodes {
+ _, backendAddr, cleanup := runInternalGitalyServer(t, node.Token)
cleanups = append(cleanups, cleanup)
- clientCC.RegisterNode(nodeStorage.Storage, backend, nodeStorage.Token)
- nodeStorage.Address = backend
- datastore.storageNodes.m[id] = nodeStorage
+ clientCC.RegisterNode(node.Storage, backendAddr, node.Token)
+ node.Address = backendAddr
+ conf.Nodes[i] = node
}
- coordinator := NewCoordinator(logEntry, datastore, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...)
+ ds := datastore.NewInMemory(conf)
+ logEntry := log.Default()
+
+ coordinator := NewCoordinator(logEntry, ds, clientCC, conf, protoregistry.GitalyProtoFileDescriptors...)
replmgr := NewReplMgr(
"",
logEntry,
- datastore,
+ ds,
clientCC,
)
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 57cbf6a30..e12aab139 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -12,6 +12,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -55,16 +56,16 @@ func init() {
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
// Replicate propagates changes from the source to the target
- Replicate(ctx context.Context, job ReplJob, source, target *grpc.ClientConn) error
+ Replicate(ctx context.Context, job datastore.ReplJob, source, target *grpc.ClientConn) error
// Destroy will remove the target repo on the specified target connection
- Destroy(ctx context.Context, job ReplJob, target *grpc.ClientConn) error
+ Destroy(ctx context.Context, job datastore.ReplJob, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Entry
}
-func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, job datastore.ReplJob, sourceCC, targetCC *grpc.ClientConn) error {
repository := &gitalypb.Repository{
StorageName: job.TargetNode.Storage,
RelativePath: job.Repository.RelativePath,
@@ -114,7 +115,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, sourceCC
return nil
}
-func (dr defaultReplicator) Destroy(ctx context.Context, job ReplJob, targetCC *grpc.ClientConn) error {
+func (dr defaultReplicator) Destroy(ctx context.Context, job datastore.ReplJob, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: job.TargetNode.Storage,
RelativePath: job.Repository.RelativePath,
@@ -167,7 +168,7 @@ func (dr defaultReplicator) confirmChecksums(ctx context.Context, primaryClient,
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Entry
- datastore Datastore
+ datastore datastore.Datastore
clientConnections *conn.ClientConnections
targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
@@ -181,7 +182,7 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Entry, datastore Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Entry, datastore datastore.Datastore, c *conn.ClientConnections, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
datastore: datastore,
@@ -225,7 +226,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
return nil
}
- id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath, UpdateRepo)
+ id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath, datastore.UpdateRepo)
if err != nil {
return err
}
@@ -254,7 +255,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
}
for _, node := range nodes {
- jobs, err := r.datastore.GetJobs(JobStateReady, node.ID, 10)
+ jobs, err := r.datastore.GetJobs(datastore.JobStateReady, node.ID, 10)
if err != nil {
return err
}
@@ -292,13 +293,13 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
// is a crutch in this situation. Ideally, we need to update state somewhere
// with information regarding the replication failure. See follow up issue:
// https://gitlab.com/gitlab-org/gitaly/issues/2138
-func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) {
+func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob) {
l := r.log.
WithField(logWithReplJobID, job.ID).
WithField(logWithReplSource, job.SourceNode).
WithField(logWithReplTarget, job.TargetNode)
- if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateInProgress); err != nil {
l.WithError(err).Error("unable to update replication job to in progress")
return
}
@@ -326,9 +327,9 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) {
defer decReplicationJobsInFlight()
switch job.Change {
- case UpdateRepo:
+ case datastore.UpdateRepo:
err = r.replicator.Replicate(injectedCtx, job, sourceCC, targetCC)
- case DeleteRepo:
+ case datastore.DeleteRepo:
err = r.replicator.Destroy(injectedCtx, job, targetCC)
default:
err = fmt.Errorf("unknown replication change type encountered: %d", job.Change)
@@ -341,7 +342,7 @@ func (r ReplMgr) processReplJob(ctx context.Context, job ReplJob) {
replDuration := time.Since(replStart)
recordReplicationLatency(float64(replDuration / time.Millisecond))
- if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ if err := r.datastore.UpdateReplJob(job.ID, datastore.JobStateComplete); err != nil {
l.WithError(err).Error("error when updating replication job status to complete")
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 44eb7873e..39623c174 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -6,7 +6,6 @@ import (
"net"
"os"
"path/filepath"
- "sync"
"testing"
"github.com/stretchr/testify/require"
@@ -15,7 +14,9 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
gitaly_log "gitlab.com/gitlab-org/gitaly/internal/log"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
@@ -69,25 +70,35 @@ func TestProceessReplicationJob(t *testing.T) {
},
)
- job := jobRecord{state: JobStateReady}
-
- m := &MemoryDatastore{
- jobs: &struct {
- sync.RWMutex
- records map[uint64]jobRecord // all jobs indexed by ID
- }{
- records: map[uint64]jobRecord{1: job},
+ config := config.Config{
+ Nodes: []*models.Node{
+ &models.Node{
+ ID: 0,
+ Storage: "default",
+ Address: srvSocketPath,
+ Token: gitaly_config.Config.Auth.Token,
+ DefaultPrimary: true,
+ },
+ &models.Node{
+ ID: 1,
+ Storage: backupStorageName,
+ Address: srvSocketPath,
+ Token: gitaly_config.Config.Auth.Token,
+ },
},
}
- replJob := ReplJob{
- Change: UpdateRepo,
- ID: 1,
- TargetNode: models.Node{Storage: backupStorageName, Address: srvSocketPath},
- SourceNode: models.Node{Storage: "default", Address: srvSocketPath, Token: testhelper.RepositoryAuthToken},
- Repository: models.Repository{Primary: models.Node{Storage: "default", Address: srvSocketPath}, RelativePath: testRepo.GetRelativePath()},
- State: JobStateReady,
- }
+ ds := datastore.NewInMemory(config)
+
+ ds.SetPrimary(testRepo.GetRelativePath(), 0)
+ ds.AddReplica(testRepo.GetRelativePath(), 1)
+
+ _, err = ds.CreateReplicaReplJobs(testRepo.GetRelativePath(), datastore.UpdateRepo)
+ require.NoError(t, err)
+
+ jobs, err := ds.GetJobs(datastore.JobStateReady|datastore.JobStatePending, 1, 1)
+ require.NoError(t, err)
+ require.Len(t, jobs, 1)
ctx, cancel := testhelper.Context()
defer cancel()
@@ -105,12 +116,12 @@ func TestProceessReplicationJob(t *testing.T) {
replMgr := &ReplMgr{
log: gitaly_log.Default(),
- datastore: m,
+ datastore: ds,
clientConnections: clientCC,
replicator: replicator,
}
- replMgr.processReplJob(ctx, replJob)
+ replMgr.processReplJob(ctx, jobs[0])
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 956d1158f..7f1b9d767 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -170,7 +170,7 @@ func TestWarnDuplicateAddrs(t *testing.T) {
tLogger, hook := test.NewNullLogger()
- setupServer(t, conf, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
+ setupServer(t, conf, nil, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning
for _, entry := range hook.Entries {
if strings.Contains(entry.Message, "more than one backend node") {