Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-03-27 07:25:52 +0300
committerPaul Okstad <pokstad@gitlab.com>2020-03-27 07:25:52 +0300
commita98b52ec08cffd61cd41bcc53761d54ef360ecee (patch)
tree914cad6d1e7fc7aa0d25ddc9d6e6c4ba448ba3e3
parent5cad7eb778160c81aa9a8755ec991030266bb4d5 (diff)
Praefect: replication event queue as a primary storage of events
Replication storage interface switched to `ReplicationEventQueue`. `gitaly_replication_queue` table extended with `meta` column introduced as a container for meta information such as correlation ID, etc. `memoryReplicationEventQueue` now populates `LockID` field to produce same result as SQL impl. `ReplicationEventQueueInterceptor` introduced for testing purposes as well as an interceptor for metrics, etc. `slice` package created to assemble common operation on different kind of slices (`Uint64` is first one). Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2166
-rw-r--r--changelogs/unreleased/ps-queued-datastore.yml5
-rw-r--r--cmd/praefect/main.go5
-rw-r--r--internal/middleware/metadatahandler/metadatahandler.go5
-rw-r--r--internal/praefect/auth_test.go5
-rw-r--r--internal/praefect/coordinator.go54
-rw-r--r--internal/praefect/coordinator_test.go86
-rw-r--r--internal/praefect/datastore/datastore.go212
-rw-r--r--internal/praefect/datastore/datastore_test.go146
-rw-r--r--internal/praefect/datastore/glsql/postgres.go6
-rw-r--r--internal/praefect/datastore/glsql/testing.go10
-rw-r--r--internal/praefect/datastore/memory.go64
-rw-r--r--internal/praefect/datastore/memory_test.go14
-rw-r--r--internal/praefect/datastore/migrations/20200224220728_job_queue.go59
-rw-r--r--internal/praefect/datastore/queue.go187
-rw-r--r--internal/praefect/datastore/queue_test.go18
-rw-r--r--internal/praefect/helper_test.go20
-rw-r--r--internal/praefect/replicator.go161
-rw-r--r--internal/praefect/replicator_test.go297
-rw-r--r--internal/praefect/server_test.go2
-rw-r--r--internal/praefect/service/info/consistencycheck.go42
-rw-r--r--internal/praefect/service/info/server.go29
21 files changed, 666 insertions, 761 deletions
diff --git a/changelogs/unreleased/ps-queued-datastore.yml b/changelogs/unreleased/ps-queued-datastore.yml
new file mode 100644
index 000000000..208414106
--- /dev/null
+++ b/changelogs/unreleased/ps-queued-datastore.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: replication event queue as a primary storage of events'
+merge_request: 1948
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 194bb7fdf..2d98b05cd 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -183,7 +183,10 @@ func run(cfgs []starter.Config, conf config.Config) error {
var (
// top level server dependencies
- ds = datastore.NewInMemory(conf)
+ ds = datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ }
coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, registry)
repl = praefect.NewReplMgr(
conf.VirtualStorages[0].Name,
diff --git a/internal/middleware/metadatahandler/metadatahandler.go b/internal/middleware/metadatahandler/metadatahandler.go
index 0581e82da..c8c2aaa9e 100644
--- a/internal/middleware/metadatahandler/metadatahandler.go
+++ b/internal/middleware/metadatahandler/metadatahandler.go
@@ -48,7 +48,8 @@ const AuthVersionKey = "grpc.meta.auth_version"
// DeadlineTypeKey is the key used in ctx_tags to store the deadline type
const DeadlineTypeKey = "grpc.meta.deadline_type"
-const correlationIDKey = "correlation_id"
+// CorrelationIDKey is the key used in ctx_tags to store the correlation ID
+const CorrelationIDKey = "correlation_id"
// Unknown client and feature. Matches the prometheus grpc unknown value
const unknownValue = "unknown"
@@ -112,7 +113,7 @@ func addMetadataTags(ctx context.Context) metadataTags {
// This is a stop-gap approach to logging correlation_ids
correlationID := correlation.ExtractFromContext(ctx)
if correlationID != "" {
- tags.Set(correlationIDKey, correlationID)
+ tags.Set(CorrelationIDKey, correlationID)
}
return metaTags
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 0229590a0..3831b60f8 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -189,7 +189,10 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
}
logEntry := testhelper.DiscardTestEntry(t)
- ds := datastore.NewInMemory(conf)
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ }
nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
require.NoError(t, err)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 7295538b4..85e53345d 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -7,6 +7,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
@@ -96,9 +97,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot
return nil, err
}
- if requestFinalizer, err = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change, params); err != nil {
- return nil, err
- }
+ requestFinalizer = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change, params)
}
return proxy.NewStreamParameters(ctx, primary.GetConnection(), requestFinalizer, nil), nil
@@ -194,29 +193,38 @@ func (c *Coordinator) createReplicaJobs(
secondaries []nodes.Node,
change datastore.ChangeType,
params datastore.Params,
-) (func(), error) {
- var secondaryStorages []string
- for _, secondary := range secondaries {
- secondaryStorages = append(secondaryStorages, secondary.GetStorage())
- }
-
- corrID := c.ensureCorrelationID(ctx, targetRepo)
-
- jobIDs, err := c.datastore.CreateReplicaReplJobs(corrID, targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change, params)
- if err != nil {
- return nil, err
- }
-
+) func() {
return func() {
- for _, jobID := range jobIDs {
- // TODO: in case of error the job remains in queue in 'pending' state and leads to:
- // - additional memory consumption
- // - stale state of one of the git data stores
- if err := c.datastore.UpdateReplJobState(jobID, datastore.JobStateReady); err != nil {
- c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %q", datastore.JobStateReady)
+ correlationID := c.ensureCorrelationID(ctx, targetRepo)
+
+ for _, secondary := range secondaries {
+ event := datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: change,
+ RelativePath: targetRepo.GetRelativePath(),
+ SourceNodeStorage: primary.GetStorage(),
+ TargetNodeStorage: secondary.GetStorage(),
+ Params: params,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID},
}
+
+ // TODO: it could happen that there won't be enough time to enqueue replication events
+ // do we need to create another ctx with another timeout?
+ // https://gitlab.com/gitlab-org/gitaly/-/issues/2586
+ go func() {
+ _, err := c.datastore.Enqueue(ctx, event)
+ if err != nil {
+ c.log.WithFields(logrus.Fields{
+ logWithReplSource: event.Job.SourceNodeStorage,
+ logWithReplTarget: event.Job.TargetNodeStorage,
+ logWithReplChange: event.Job.Change,
+ logWithReplPath: event.Job.RelativePath,
+ }).Error("failed to persist replication event")
+ }
+ }()
}
- }, nil
+ }
}
func (c *Coordinator) ensureCorrelationID(ctx context.Context, targetRepo *gitalypb.Repository) string {
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 362eb6787..9251d8010 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -1,12 +1,15 @@
package praefect
import (
+ "context"
"io/ioutil"
+ "sync"
"testing"
"github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -46,7 +49,19 @@ func TestStreamDirector(t *testing.T) {
},
},
}
- ds := datastore.NewInMemory(conf)
+
+ var replEventWait sync.WaitGroup
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ defer replEventWait.Done()
+ return queue.Enqueue(ctx, event)
+ })
+
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: queueInterceptor,
+ }
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -95,36 +110,36 @@ func TestStreamDirector(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name")
- jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStatePending}, "praefect-internal-2", 10)
- require.NoError(t, err)
- require.Len(t, jobs, 1)
+ replEventWait.Add(1) // expected only one event to be created
+ // this call creates new events in the queue and simulates usual flow of the update operation
+ streamParams.RequestFinalizer()
targetNode, err := ds.GetStorageNode("praefect-internal-2")
require.NoError(t, err)
sourceNode, err := ds.GetStorageNode("praefect-internal-1")
-
require.NoError(t, err)
- expectedJob := datastore.ReplJob{
- Change: datastore.UpdateRepo,
- ID: 1,
- TargetNode: targetNode,
- SourceNode: sourceNode,
- State: datastore.JobStatePending,
- RelativePath: targetRepo.RelativePath,
- CorrelationID: "my-correlation-id",
- }
-
- require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct")
-
- streamParams.RequestFinalizer()
-
- jobs, err = coordinator.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady}, "praefect-internal-2", 10)
+ replEventWait.Wait() // wait until event persisted (async operation)
+ events, err := ds.ReplicationEventQueue.Dequeue(ctx, "praefect-internal-2", 10)
require.NoError(t, err)
- require.Len(t, jobs, 1)
-
- expectedJob.State = datastore.JobStateReady
- require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady")
+ require.Len(t, events, 1)
+
+ expectedEvent := datastore.ReplicationEvent{
+ ID: 1,
+ State: datastore.JobStateInProgress,
+ Attempt: 2,
+ LockID: "praefect-internal-2|/path/to/hashed/storage",
+ CreatedAt: events[0].CreatedAt,
+ UpdatedAt: events[0].UpdatedAt,
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ RelativePath: targetRepo.RelativePath,
+ TargetNodeStorage: targetNode.Storage,
+ SourceNodeStorage: sourceNode.Storage,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
+ }
+ require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
}
type mockPeeker struct {
@@ -159,7 +174,19 @@ func TestAbsentCorrelationID(t *testing.T) {
},
},
}
- ds := datastore.NewInMemory(conf)
+
+ var replEventWait sync.WaitGroup
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
+ defer replEventWait.Done()
+ return queue.Enqueue(ctx, event)
+ })
+
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: queueInterceptor,
+ }
targetRepo := gitalypb.Repository{
StorageName: "praefect",
@@ -191,10 +218,15 @@ func TestAbsentCorrelationID(t *testing.T) {
require.NoError(t, err)
require.Equal(t, address, streamParams.Conn().Target())
- jobs, err := coordinator.datastore.GetJobs([]datastore.JobState{datastore.JobStatePending}, conf.VirtualStorages[0].Nodes[1].Storage, 1)
+ replEventWait.Add(1) // expected only one event to be created
+ // must be run as it adds replication events to the queue
+ streamParams.RequestFinalizer()
+
+ replEventWait.Wait() // wait until event persisted (async operation)
+ jobs, err := coordinator.datastore.Dequeue(ctx, conf.VirtualStorages[0].Nodes[1].Storage, 1)
require.NoError(t, err)
require.Len(t, jobs, 1)
- require.NotZero(t, jobs[0].CorrelationID,
+ require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey],
"the coordinator should have generated a random ID")
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index ae084d1ad..0b2d91c60 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -6,9 +6,10 @@
package datastore
import (
+ "database/sql/driver"
+ "encoding/json"
"errors"
"fmt"
- "sort"
"sync"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
@@ -23,9 +24,6 @@ func (js JobState) String() string {
}
const (
- // JobStatePending is the initial job state when it is not yet ready to run
- // and may indicate recovery from a failure prior to the ready-state.
- JobStatePending = JobState("pending")
// JobStateReady indicates the job is now ready to proceed.
JobStateReady = JobState("ready")
// JobStateInProgress indicates the job is being processed by a worker.
@@ -62,6 +60,25 @@ func (ct ChangeType) String() string {
// It must be JSON encodable/decodable to persist it without problems.
type Params map[string]interface{}
+// Scan assigns a value from a database driver.
+func (p *Params) Scan(value interface{}) error {
+ if value == nil {
+ return nil
+ }
+
+ d, ok := value.([]byte)
+ if !ok {
+ return fmt.Errorf("unexpected type received: %T", value)
+ }
+
+ return json.Unmarshal(d, p)
+}
+
+// Value returns a driver Value.
+func (p Params) Value() (driver.Value, error) {
+ return json.Marshal(p)
+}
+
// ReplJob is an instance of a queued replication job. A replication job is
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
@@ -76,22 +93,11 @@ type ReplJob struct {
CorrelationID string // from original request
}
-// replJobs provides sort manipulation behavior
-type replJobs []ReplJob
-
-func (rjs replJobs) Len() int { return len(rjs) }
-func (rjs replJobs) Swap(i, j int) { rjs[i], rjs[j] = rjs[j], rjs[i] }
-
-// byJobID provides a comparator for sorting jobs
-type byJobID struct{ replJobs }
-
-func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].ID }
-
// Datastore is a data persistence abstraction for all of Praefect's
// persistence needs
type Datastore interface {
- ReplJobsDatastore
ReplicasDatastore
+ ReplicationEventQueue
}
// ReplicasDatastore manages accessing and setting which secondary replicas
@@ -108,45 +114,16 @@ type ReplicasDatastore interface {
GetStorageNodes() ([]models.Node, error)
}
-// ReplJobsDatastore represents the behavior needed for fetching and updating
-// replication jobs from the datastore
-type ReplJobsDatastore interface {
- // GetJobs fetches a list of chronologically ordered replication
- // jobs for the given storage replica. The returned list will be at most
- // count-length.
- GetJobs(states []JobState, nodeStorage string, count int) ([]ReplJob, error)
-
- // CreateReplicaReplJobs will create replication jobs for each secondary
- // replica of a repository known to the datastore. A set of replication job
- // ID's for the created jobs will be returned upon success.
- CreateReplicaReplJobs(correlationID, relativePath, primaryStorage string, secondaryStorages []string, change ChangeType, params Params) ([]uint64, error)
-
- // UpdateReplJobState updates the state of an existing replication job
- UpdateReplJobState(jobID uint64, newState JobState) error
-
- IncrReplJobAttempts(jobID uint64) error
-}
-
-type jobRecord struct {
- change ChangeType
- relativePath string // project's relative path
- targetNodeStorage string
- sourceNodeStorage string
- state JobState
- attempts int
- params Params
- correlationID string // from original request
+// MemoryQueue is an intermediate struct used for introduction of ReplicationEventQueue into usage.
+type MemoryQueue struct {
+ *MemoryDatastore
+ ReplicationEventQueue
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
type MemoryDatastore struct {
- jobs *struct {
- sync.RWMutex
- records map[uint64]jobRecord // all jobs indexed by ID
- }
-
// storageNodes is read-only after initialization
// if modification needed there must be synchronization for concurrent access to it
storageNodes map[string]models.Node
@@ -165,12 +142,6 @@ type MemoryDatastore struct {
func NewInMemory(cfg config.Config) *MemoryDatastore {
m := &MemoryDatastore{
storageNodes: map[string]models.Node{},
- jobs: &struct {
- sync.RWMutex
- records map[uint64]jobRecord // all jobs indexed by ID
- }{
- records: map[uint64]jobRecord{},
- },
repositories: &struct {
sync.RWMutex
m map[string]models.Repository
@@ -255,134 +226,3 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
return storageNodes, nil
}
-
-// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(states []JobState, targetNodeStorage string, count int) ([]ReplJob, error) {
- statesSet := make(map[JobState]bool, len(states))
- for _, state := range states {
- statesSet[state] = true
- }
-
- md.jobs.RLock()
- defer md.jobs.RUnlock()
-
- var results []ReplJob
-
- for i, record := range md.jobs.records {
- // state is a bitmap that is a combination of one or more JobStates
- if statesSet[record.state] && record.targetNodeStorage == targetNodeStorage {
- job, err := md.replJobFromRecord(i, record)
- if err != nil {
- return nil, err
- }
-
- results = append(results, job)
- if len(results) >= count {
- break
- }
- }
- }
-
- sort.Sort(byJobID{results})
-
- return results, nil
-}
-
-// replJobFromRecord constructs a replication job from a record and by cross
-// referencing the current repository for the project being replicated
-func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- sourceNode, err := md.GetStorageNode(record.sourceNodeStorage)
- if err != nil {
- return ReplJob{}, err
- }
- targetNode, err := md.GetStorageNode(record.targetNodeStorage)
- if err != nil {
- return ReplJob{}, err
- }
-
- return ReplJob{
- Change: record.change,
- ID: jobID,
- RelativePath: record.relativePath,
- SourceNode: sourceNode,
- State: record.state,
- TargetNode: targetNode,
- Attempts: record.attempts,
- Params: record.params,
- CorrelationID: record.correlationID,
- }, nil
-}
-
-// CreateReplicaReplJobs creates a replication job for each secondary that
-// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateReplicaReplJobs(
- correlationID string,
- relativePath,
- primaryStorage string,
- secondaryStorages []string,
- change ChangeType,
- params Params,
-) ([]uint64, error) {
- md.jobs.Lock()
- defer md.jobs.Unlock()
-
- if relativePath == "" {
- return nil, errors.New("invalid source repository")
- }
-
- var jobIDs []uint64
-
- for _, secondaryStorage := range secondaryStorages {
- nextID := uint64(len(md.jobs.records) + 1)
-
- md.jobs.records[nextID] = jobRecord{
- change: change,
- targetNodeStorage: secondaryStorage,
- state: JobStatePending,
- relativePath: relativePath,
- sourceNodeStorage: primaryStorage,
- params: params,
- correlationID: correlationID,
- }
-
- jobIDs = append(jobIDs, nextID)
- }
-
- return jobIDs, nil
-}
-
-// UpdateReplJobState updates an existing replication job's state
-func (md *MemoryDatastore) UpdateReplJobState(jobID uint64, newState JobState) error {
- md.jobs.Lock()
- defer md.jobs.Unlock()
-
- job, ok := md.jobs.records[jobID]
- if !ok {
- return fmt.Errorf("job ID %d does not exist", jobID)
- }
-
- if newState == JobStateCompleted || newState == JobStateCancelled {
- // remove the job to avoid filling up memory with unneeded job records
- delete(md.jobs.records, jobID)
- return nil
- }
-
- job.state = newState
- md.jobs.records[jobID] = job
- return nil
-}
-
-// IncrReplJobAttempts updates an existing replication job's state
-func (md *MemoryDatastore) IncrReplJobAttempts(jobID uint64) error {
- md.jobs.Lock()
- defer md.jobs.Unlock()
-
- job, ok := md.jobs.records[jobID]
- if !ok {
- return fmt.Errorf("job ID %d does not exist", jobID)
- }
-
- job.attempts++
- md.jobs.records[jobID] = job
- return nil
-}
diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go
deleted file mode 100644
index b3311d970..000000000
--- a/internal/praefect/datastore/datastore_test.go
+++ /dev/null
@@ -1,146 +0,0 @@
-// +build !postgres
-
-package datastore
-
-import (
- "testing"
-
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
-)
-
-var (
- stor1 = models.Node{
- Address: "tcp://address-1",
- Storage: "praefect-storage-1",
- DefaultPrimary: true,
- }
- stor2 = models.Node{
- Address: "tcp://address-2",
- Storage: "praefect-storage-2",
- }
- proj1 = "abcd1234" // imagine this is a legit project hash
- correlationID = "my-correlation-id"
-)
-
-var (
- repo1Repository = models.Repository{
- RelativePath: proj1,
- }
-)
-
-var operations = []struct {
- desc string
- opFn func(*testing.T, Datastore)
-}{
- {
- desc: "query an empty datastore",
- opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs([]JobState{JobStatePending, JobStateReady}, stor1.Storage, 1)
- require.NoError(t, err)
- require.Len(t, jobs, 0)
- },
- },
- {
- desc: "insert replication job for Update",
- opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo, nil)
- require.NoError(t, err)
- },
- },
- {
- desc: "insert replication job for Rename",
- opFn: func(t *testing.T, ds Datastore) {
- _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, RenameRepo, Params{"RelativePath": "/data/dir/repo"})
- require.NoError(t, err)
- },
- },
- {
- desc: "fetch inserted replication jobs",
- opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 10)
- require.NoError(t, err)
- require.Len(t, jobs, 2)
-
- expectedJobs := []ReplJob{
- {
- Change: UpdateRepo,
- ID: 1,
- RelativePath: repo1Repository.RelativePath,
- SourceNode: stor1,
- TargetNode: stor2,
- State: JobStatePending,
- Params: nil,
- CorrelationID: correlationID,
- },
- {
- Change: RenameRepo,
- ID: 2,
- RelativePath: repo1Repository.RelativePath,
- SourceNode: stor1,
- TargetNode: stor2,
- State: JobStatePending,
- Params: Params{"RelativePath": "/data/dir/repo"},
- CorrelationID: correlationID,
- },
- }
- require.ElementsMatch(t, expectedJobs, jobs)
- },
- },
- {
- desc: "mark Update replication job as done",
- opFn: func(t *testing.T, ds Datastore) {
- err := ds.UpdateReplJobState(1, JobStateCompleted)
- require.NoError(t, err)
- },
- },
- {
- desc: "try fetching pending replication jobs",
- opFn: func(t *testing.T, ds Datastore) {
- jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 1)
- require.NoError(t, err)
- require.Len(t, jobs, 1)
-
- completed := ReplJob{
- Change: RenameRepo,
- ID: 2,
- RelativePath: repo1Repository.RelativePath,
- SourceNode: stor1,
- TargetNode: stor2,
- State: JobStatePending,
- Params: Params{"RelativePath": "/data/dir/repo"},
- CorrelationID: correlationID,
- }
- require.Equal(t, completed, jobs[0])
- },
- },
-}
-
-// TODO: add SQL datastore flavor
-var flavors = map[string]func() Datastore{
- "in-memory-datastore": func() Datastore {
- return NewInMemory(config.Config{
- VirtualStorages: []*config.VirtualStorage{
- &config.VirtualStorage{
- Nodes: []*models.Node{&stor1, &stor2},
- },
- },
- })
- },
-}
-
-// TestDatastoreInterface will verify that every implementation or "flavor" of
-// datastore interface (in-Memory or SQL) behaves consistently given the same
-// series of operations
-func TestDatastoreInterface(t *testing.T) {
- for name, dsFactory := range flavors {
- t.Run(name, func(t *testing.T) {
- ds := dsFactory()
- for i, op := range operations {
- t.Logf("operation %d: %s", i+1, op.desc)
- op.opFn(t, ds)
- }
- })
- }
-}
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 455282af6..ee837e86a 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -216,7 +216,7 @@ func (p *Uint64Provider) Values() []uint64 {
// To returns a list of pointers that will be used as a destination for scan operation.
func (p *Uint64Provider) To() []interface{} {
- var d = new(uint64)
- *p = append(*p, d)
- return []interface{}{d}
+ var d uint64
+ *p = append(*p, &d)
+ return []interface{}{&d}
}
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index eceddf891..8bff18850 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -51,9 +51,9 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
func (db DB) TruncateAll(t testing.TB) {
db.Truncate(t,
- "gitaly_replication_queue_job_lock",
- "gitaly_replication_queue",
- "gitaly_replication_queue_lock",
+ "replication_queue_job_lock",
+ "replication_queue",
+ "replication_queue_lock",
)
}
@@ -67,7 +67,7 @@ func (db DB) Close() error {
// GetDB returns a wrapper around the database connection pool.
// Must be used only for testing.
-// The new database 'gitaly_test' will be re-created for each package that uses this function.
+// The new `database` will be re-created for each package that uses this function.
// Each call will also truncate all tables with their identities restarted if any.
// The best place to call it is in individual testing functions.
// It uses env vars:
@@ -121,7 +121,7 @@ func initGitalyTestDB(t testing.TB, database string) *sql.DB {
require.NoError(t, rows.Close())
_, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS " + database)
- require.NoError(t, dErr, "failed to drop 'gitaly_test' database")
+ require.NoErrorf(t, dErr, "failed to drop %q database", database)
_, cErr := postgresDB.Exec("CREATE DATABASE " + database + " WITH ENCODING 'UTF8'")
require.NoErrorf(t, cErr, "failed to create %q database", database)
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index e5408e6f0..4f6f72938 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -32,6 +32,8 @@ func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event Replicati
event.State = JobStateReady
event.CreatedAt = time.Now().UTC()
// event.LockID is unnecessary with an in memory data store as it is intended to synchronize multiple praefect instances
+ // but must be filled out to produce same event as it done by SQL implementation
+ event.LockID = event.Job.TargetNodeStorage + "|" + event.Job.RelativePath
s.Lock()
defer s.Unlock()
@@ -106,7 +108,7 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
result = append(result, id)
switch state {
- case JobStateCompleted:
+ case JobStateCompleted, JobStateCancelled, JobStateDead:
// this event is fully processed and could be removed
s.remove(i)
case JobStateFailed:
@@ -114,9 +116,6 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
// out of luck for this replication event, remove from queue as no more attempts available
s.remove(i)
}
- case JobStateCancelled:
- // out of luck for this replication event, remove from queue as no more attempts available
- s.remove(i)
}
break
}
@@ -131,3 +130,60 @@ func (s *memoryReplicationEventQueue) remove(i int) {
delete(s.dequeued, s.queued[i].ID)
s.queued = append(s.queued[:i], s.queued[i+1:]...)
}
+
+// ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface.
+type ReplicationEventQueueInterceptor interface {
+ // ReplicationEventQueue actual implementation.
+ ReplicationEventQueue
+ // OnEnqueue allows to set action that would be executed each time when `Enqueue` method called.
+ OnEnqueue(func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error))
+ // OnDequeue allows to set action that would be executed each time when `Dequeue` method called.
+ OnDequeue(func(context.Context, string, int, ReplicationEventQueue) ([]ReplicationEvent, error))
+ // OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called.
+ OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error))
+}
+
+// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
+func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) ReplicationEventQueueInterceptor {
+ return &replicationEventQueueInterceptor{ReplicationEventQueue: queue}
+}
+
+type replicationEventQueueInterceptor struct {
+ ReplicationEventQueue
+ onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)
+ onDequeue func(context.Context, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)
+ onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)
+}
+
+func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) {
+ i.onEnqueue = action
+}
+
+func (i *replicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) {
+ i.onDequeue = action
+}
+
+func (i *replicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) {
+ i.onAcknowledge = action
+}
+
+func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
+ if i.onEnqueue != nil {
+ return i.onEnqueue(ctx, event, i.ReplicationEventQueue)
+ }
+ return i.ReplicationEventQueue.Enqueue(ctx, event)
+}
+
+func (i *replicationEventQueueInterceptor) Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) {
+ if i.onDequeue != nil {
+ return i.onDequeue(ctx, nodeStorage, count, i.ReplicationEventQueue)
+ }
+ return i.ReplicationEventQueue.Dequeue(ctx, nodeStorage, count)
+}
+
+func (i *replicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) {
+ if i.onAcknowledge != nil {
+ return i.onAcknowledge(ctx, state, ids, i.ReplicationEventQueue)
+ }
+ return i.ReplicationEventQueue.Acknowledge(ctx, state, ids)
+}
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
index c55f9df50..1f33ff200 100644
--- a/internal/praefect/datastore/memory_test.go
+++ b/internal/praefect/datastore/memory_test.go
@@ -39,7 +39,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
ID: 1,
State: JobStateReady,
Attempt: 3,
- LockID: "",
+ LockID: "storage-1|/project/path-1",
CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
Job: job1,
}
@@ -65,7 +65,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
ID: 2,
State: JobStateReady,
Attempt: 3,
- LockID: "",
+ LockID: "storage-2|/project/path-1",
CreatedAt: event2.CreatedAt, // it is a hack to have same time for both
Job: job2,
}
@@ -79,7 +79,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
ID: 1,
State: JobStateInProgress,
Attempt: 2,
- LockID: "",
+ LockID: "storage-1|/project/path-1",
CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
UpdatedAt: dequeuedAttempt1[0].UpdatedAt, // it is a hack to have same time for both
Job: job1,
@@ -98,7 +98,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
ID: 1,
State: JobStateInProgress,
Attempt: 1,
- LockID: "",
+ LockID: "storage-1|/project/path-1",
CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
UpdatedAt: dequeuedAttempt2[0].UpdatedAt, // it is a hack to have same time for both
Job: job1,
@@ -117,7 +117,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
ID: 1,
State: JobStateInProgress,
Attempt: 0,
- LockID: "",
+ LockID: "storage-1|/project/path-1",
CreatedAt: event1.CreatedAt, // it is a hack to have same time for both
UpdatedAt: dequeuedAttempt3[0].UpdatedAt, // it is a hack to have same time for both
Job: job1,
@@ -140,14 +140,14 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
ID: 2,
State: JobStateInProgress,
Attempt: 2,
- LockID: "",
+ LockID: "storage-2|/project/path-1",
CreatedAt: event2.CreatedAt, // it is a hack to have same time for both
UpdatedAt: dequeuedAttempt5[0].UpdatedAt, // it is a hack to have same time for both
Job: job2,
}
require.Equal(t, expAttempt5, dequeuedAttempt5[0])
- acknowledgedAttempt5, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{event2.ID})
+ acknowledgedAttempt5, err := queue.Acknowledge(ctx, JobStateDead, []uint64{event2.ID})
require.NoError(t, err)
require.Equal(t, []uint64{event2.ID}, acknowledgedAttempt5, "one event must be acknowledged")
diff --git a/internal/praefect/datastore/migrations/20200224220728_job_queue.go b/internal/praefect/datastore/migrations/20200224220728_job_queue.go
index e63116354..cab5a5494 100644
--- a/internal/praefect/datastore/migrations/20200224220728_job_queue.go
+++ b/internal/praefect/datastore/migrations/20200224220728_job_queue.go
@@ -5,40 +5,33 @@ import migrate "github.com/rubenv/sql-migrate"
func init() {
m := &migrate.Migration{
Id: "20200224220728_job_queue",
- Up: []string{`
-CREATE TYPE GITALY_REPLICATION_JOB_STATE AS ENUM('ready', 'in_progress', 'completed', 'cancelled', 'failed')
-`, `
-CREATE TABLE gitaly_replication_queue_lock
-(
- id TEXT PRIMARY KEY
- , acquired BOOLEAN NOT NULL DEFAULT FALSE
-)
-`, `
-CREATE TABLE gitaly_replication_queue
-(
- id BIGSERIAL PRIMARY KEY
- , state GITALY_REPLICATION_JOB_STATE NOT NULL DEFAULT 'ready'
- , created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
- , updated_at TIMESTAMP WITHOUT TIME ZONE
- , attempt INTEGER NOT NULL DEFAULT 3
- , lock_id TEXT
- , job JSONB
-)`, `
-CREATE TABLE gitaly_replication_queue_job_lock
-(
- job_id BIGINT REFERENCES gitaly_replication_queue(id)
- , lock_id TEXT REFERENCES gitaly_replication_queue_lock(id)
- , triggered_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
- , CONSTRAINT gitaly_replication_queue_job_lock_pk PRIMARY KEY (job_id, lock_id)
-)`,
+ Up: []string{
+ `CREATE TYPE REPLICATION_JOB_STATE AS ENUM('ready', 'in_progress', 'completed', 'cancelled', 'failed', 'dead')`,
+ `CREATE TABLE replication_queue_lock (
+ id TEXT PRIMARY KEY
+ , acquired BOOLEAN NOT NULL DEFAULT FALSE
+ )`,
+ `CREATE TABLE replication_queue (
+ id BIGSERIAL PRIMARY KEY
+ , state REPLICATION_JOB_STATE NOT NULL DEFAULT 'ready'
+ , created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
+ , updated_at TIMESTAMP WITHOUT TIME ZONE
+ , attempt INTEGER NOT NULL DEFAULT 3
+ , lock_id TEXT
+ , job JSONB
+ , meta JSONB
+ )`,
+ `CREATE TABLE replication_queue_job_lock (
+ job_id BIGINT REFERENCES replication_queue(id)
+ , lock_id TEXT REFERENCES replication_queue_lock(id)
+ , triggered_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC')
+ , CONSTRAINT replication_queue_job_lock_pk PRIMARY KEY (job_id, lock_id)
+ )`,
},
- Down: []string{`
-DROP TABLE IF EXISTS gitaly_replication_queue_job_lock CASCADE
-`, `
-DROP TABLE IF EXISTS gitaly_replication_queue CASCADE
-`, `
-DROP TABLE IF EXISTS gitaly_replication_queue_lock CASCADE
-`,
+ Down: []string{
+ `DROP TABLE IF EXISTS replication_queue_job_lock CASCADE`,
+ `DROP TABLE IF EXISTS replication_queue CASCADE`,
+ `DROP TABLE IF EXISTS replication_queue_lock CASCADE`,
},
}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 42453f3a4..e15ac48a6 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -25,7 +25,7 @@ type ReplicationEventQueue interface {
func allowToAck(state JobState) error {
switch state {
- case JobStateCompleted, JobStateFailed, JobStateCancelled:
+ case JobStateCompleted, JobStateFailed, JobStateCancelled, JobStateDead:
return nil
default:
return fmt.Errorf("event state is not supported: %q", state)
@@ -67,6 +67,7 @@ type ReplicationEvent struct {
CreatedAt time.Time
UpdatedAt *time.Time
Job ReplicationJob
+ Meta Params
}
// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.
@@ -88,6 +89,8 @@ func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error)
mapping = append(mapping, &event.LockID)
case "job":
mapping = append(mapping, &event.Job)
+ case "meta":
+ mapping = append(mapping, &event.Meta)
default:
return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column)
}
@@ -138,18 +141,18 @@ type PostgresReplicationEventQueue struct {
func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
query := `
-WITH insert_lock AS (
- INSERT INTO gitaly_replication_queue_lock(id)
- VALUES ($1 || '|' || $2)
- ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
- RETURNING id
-)
-INSERT INTO gitaly_replication_queue(lock_id, job)
-SELECT insert_lock.id, $3
-FROM insert_lock
-RETURNING id, state, created_at, updated_at, lock_id, attempt, job`
+ WITH insert_lock AS (
+ INSERT INTO replication_queue_lock(id)
+ VALUES ($1 || '|' || $2)
+ ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
+ RETURNING id
+ )
+ INSERT INTO replication_queue(lock_id, job, meta)
+ SELECT insert_lock.id, $3, $4
+ FROM insert_lock
+ RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta`
// this will always return a single row result (because of lock uniqueness) or an error
- rows, err := rq.qc.QueryContext(ctx, query, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job)
+ rows, err := rq.qc.QueryContext(ctx, query, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta)
if err != nil {
return ReplicationEvent{}, err
}
@@ -164,49 +167,48 @@ RETURNING id, state, created_at, updated_at, lock_id, attempt, job`
func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) {
query := `
-WITH to_lock AS (
- SELECT id
- FROM gitaly_replication_queue_lock AS repo_lock
- WHERE repo_lock.acquired = FALSE AND repo_lock.id IN (
- SELECT lock_id
- FROM gitaly_replication_queue
- WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
- ORDER BY created_at
- LIMIT $2 FOR UPDATE
- )
- FOR UPDATE SKIP LOCKED
-)
-, jobs AS (
- UPDATE gitaly_replication_queue AS queue
- SET attempt = queue.attempt - 1,
- state = 'in_progress',
- updated_at = NOW() AT TIME ZONE 'UTC'
- FROM to_lock
- WHERE queue.lock_id IN (SELECT id FROM to_lock)
- AND state NOT IN ('in_progress', 'cancelled', 'completed')
- AND queue.id IN (
- SELECT id
- FROM gitaly_replication_queue
- WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
- ORDER BY created_at
- LIMIT $2
- )
- RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job
-)
-, track_job_lock AS (
- INSERT INTO gitaly_replication_queue_job_lock (job_id, lock_id, triggered_at)
- SELECT jobs.id, jobs.lock_id, NOW() AT TIME ZONE 'UTC' FROM jobs
- RETURNING lock_id
-)
-, do_lock AS (
- UPDATE gitaly_replication_queue_lock
- SET acquired = TRUE
- WHERE id IN (SELECT lock_id FROM track_job_lock)
-)
-SELECT id, state, created_at, updated_at, lock_id, attempt, job
-FROM jobs
-ORDER BY id
-`
+ WITH to_lock AS (
+ SELECT id
+ FROM replication_queue_lock AS repo_lock
+ WHERE repo_lock.acquired = FALSE AND repo_lock.id IN (
+ SELECT lock_id
+ FROM replication_queue
+ WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
+ ORDER BY created_at
+ LIMIT $2 FOR UPDATE
+ )
+ FOR UPDATE SKIP LOCKED
+ )
+ , jobs AS (
+ UPDATE replication_queue AS queue
+ SET attempt = queue.attempt - 1
+ , state = 'in_progress'
+ , updated_at = NOW() AT TIME ZONE 'UTC'
+ FROM to_lock
+ WHERE queue.lock_id IN (SELECT id FROM to_lock)
+ AND state NOT IN ('in_progress', 'cancelled', 'completed')
+ AND queue.id IN (
+ SELECT id
+ FROM replication_queue
+ WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1
+ ORDER BY created_at
+ LIMIT $2
+ )
+ RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta
+ )
+ , track_job_lock AS (
+ INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at)
+ SELECT jobs.id, jobs.lock_id, NOW() AT TIME ZONE 'UTC' FROM jobs
+ RETURNING lock_id
+ )
+ , do_lock AS (
+ UPDATE replication_queue_lock
+ SET acquired = TRUE
+ WHERE id IN (SELECT lock_id FROM track_job_lock)
+ )
+ SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta
+ FROM jobs
+ ORDER BY id`
rows, err := rq.qc.QueryContext(ctx, query, nodeStorage, count)
if err != nil {
return nil, err
@@ -229,45 +231,42 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
params := glsql.NewParamsAssembler()
query := `
-WITH existing AS (
- SELECT id, lock_id
- FROM gitaly_replication_queue
- WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `)
- AND state = 'in_progress'
- FOR UPDATE
-)
-, to_release AS (
- UPDATE gitaly_replication_queue AS queue
- SET state = ` + params.AddParam(state) + `
- FROM existing
- WHERE existing.id = queue.id
- RETURNING queue.id, queue.lock_id
-)
-, removed_job_lock AS (
- DELETE FROM gitaly_replication_queue_job_lock AS job_lock
- USING to_release AS job_failed
- WHERE job_lock.job_id = job_failed.id AND job_lock.lock_id = job_failed.lock_id
- RETURNING job_failed.lock_id
-)
-, release AS (
- UPDATE gitaly_replication_queue_lock
- SET acquired = FALSE
- WHERE id IN (
- SELECT existing.lock_id
- FROM (
- SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id
- ) AS removed
- JOIN (
- SELECT lock_id, COUNT(*) AS amount
- FROM gitaly_replication_queue_job_lock
- WHERE lock_id IN (SELECT lock_id FROM removed_job_lock)
- GROUP BY lock_id
- ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
- )
-)
-SELECT id
-FROM existing
-`
+ WITH existing AS (
+ SELECT id, lock_id
+ FROM replication_queue
+ WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `)
+ AND state = 'in_progress'
+ FOR UPDATE
+ )
+ , to_release AS (
+ UPDATE replication_queue AS queue
+ SET state = ` + params.AddParam(state) + `
+ FROM existing
+ WHERE existing.id = queue.id
+ RETURNING queue.id, queue.lock_id
+ )
+ , removed_job_lock AS (
+ DELETE FROM replication_queue_job_lock AS job_lock
+ USING to_release AS job_failed
+ WHERE job_lock.job_id = job_failed.id AND job_lock.lock_id = job_failed.lock_id
+ RETURNING job_failed.lock_id
+ )
+ , release AS (
+ UPDATE replication_queue_lock
+ SET acquired = FALSE
+ WHERE id IN (
+ SELECT existing.lock_id
+ FROM (SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id) AS removed
+ JOIN (
+ SELECT lock_id, COUNT(*) AS amount
+ FROM replication_queue_job_lock
+ WHERE lock_id IN (SELECT lock_id FROM removed_job_lock)
+ GROUP BY lock_id
+ ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
+ )
+ )
+ SELECT id
+ FROM existing`
rows, err := rq.qc.QueryContext(ctx, query, params.Params()...)
if err != nil {
return nil, err
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 749c45ca8..56010f86c 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -53,7 +53,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) {
require.Equal(t, expEvent, actualEvent)
requireEvents(t, ctx, db, []ReplicationEvent{expEvent})
requireLocks(t, ctx, db, []LockRow{expLock}) // expected a new lock for new event
- db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0)
+ db.RequireRowsInTable(t, "replication_queue_job_lock", 0)
}
func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
@@ -117,7 +117,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
requireEvents(t, ctx, db, []ReplicationEvent{expEvent1})
requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event
- db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0)
+ db.RequireRowsInTable(t, "replication_queue_job_lock", 0)
event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event
require.NoError(t, err)
@@ -179,7 +179,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) {
requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4})
requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo
- db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) // there is no fetches it must be empty
+ db.RequireRowsInTable(t, "replication_queue_job_lock", 0) // there is no fetches it must be empty
}
func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) {
@@ -536,7 +536,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
require.NoError(t, err)
require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged
var newEventState string
- require.NoError(t, db.QueryRow("SELECT state FROM gitaly_replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState))
+ require.NoError(t, db.QueryRow("SELECT state FROM replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState))
require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)")
requireLocks(t, ctx, db, []LockRow{
{ID: "gitaly-1|/project/path-1", Acquired: true},
@@ -563,7 +563,7 @@ func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []Re
exp[i].UpdatedAt = nil
}
- sqlStmt := `SELECT id, state, attempt, lock_id, job FROM gitaly_replication_queue ORDER BY id`
+ sqlStmt := `SELECT id, state, attempt, lock_id, job FROM replication_queue ORDER BY id`
rows, err := db.QueryContext(ctx, sqlStmt)
require.NoError(t, err)
@@ -572,7 +572,7 @@ func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []Re
require.Equal(t, exp, actual)
}
-// LockRow exists only for testing purposes and represents entries from gitaly_replication_queue_lock table.
+// LockRow exists only for testing purposes and represents entries from replication_queue_lock table.
type LockRow struct {
ID string
Acquired bool
@@ -581,7 +581,7 @@ type LockRow struct {
func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []LockRow) {
t.Helper()
- sqlStmt := `SELECT id, acquired FROM gitaly_replication_queue_lock`
+ sqlStmt := `SELECT id, acquired FROM replication_queue_lock`
rows, err := db.QueryContext(ctx, sqlStmt)
require.NoError(t, err)
defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
@@ -596,7 +596,7 @@ func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []Loc
require.ElementsMatch(t, expected, actual)
}
-// JobLockRow exists only for testing purposes and represents entries from gitaly_replication_queue_job_lock table.
+// JobLockRow exists only for testing purposes and represents entries from replication_queue_job_lock table.
type JobLockRow struct {
JobID uint64
LockID string
@@ -606,7 +606,7 @@ type JobLockRow struct {
func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) {
t.Helper()
- sqlStmt := `SELECT job_id, lock_id FROM gitaly_replication_queue_job_lock ORDER BY triggered_at`
+ sqlStmt := `SELECT job_id, lock_id FROM replication_queue_job_lock ORDER BY triggered_at`
rows, err := db.QueryContext(ctx, sqlStmt)
require.NoError(t, err)
defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 930451144..22c09f75d 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -75,11 +75,12 @@ func testConfig(backends int) config.Config {
// setupServer wires all praefect dependencies together via dependency
// injection
-func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) (*datastore.MemoryDatastore, *Server) {
- var (
- ds = datastore.NewInMemory(conf)
- coordinator = NewCoordinator(l, ds, nodeMgr, conf, r)
- )
+func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) *Server {
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ }
+ coordinator := NewCoordinator(l, ds, nodeMgr, conf, r)
var defaultNode *models.Node
for _, n := range conf.VirtualStorages[0].Nodes {
@@ -91,7 +92,7 @@ func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *log
server := NewServer(coordinator.StreamDirector, l, r, conf)
- return ds, server
+ return server
}
// runPraefectServer runs a praefect server with the provided mock servers.
@@ -124,7 +125,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
r := protoregistry.New()
require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t)))
- _, prf := setupServer(t, conf, nodeMgr, log.Default(), r)
+ prf := setupServer(t, conf, nodeMgr, log.Default(), r)
listener, port := listenAvailPort(t)
t.Logf("praefect listening on port %d", port)
@@ -175,7 +176,10 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
conf.VirtualStorages[0].Nodes[i] = node
}
- ds := datastore.NewInMemory(conf)
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ }
logEntry := log.Default()
nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, promtest.NewMockHistogramVec())
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 9fa23eeb6..d1afaf8a5 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -9,6 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
@@ -250,6 +251,8 @@ const (
logWithReplJobID = "replication_job_id"
logWithReplSource = "replication_job_source"
logWithReplTarget = "replication_job_target"
+ logWithReplChange = "replication_job_change"
+ logWithReplPath = "replication_job_path"
logWithCorrID = "replication_correlation_id"
)
@@ -279,10 +282,6 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc {
}
}
-const (
- maxAttempts = 3
-)
-
func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []nodes.Node, err error) {
shard, err := r.nodeManager.GetShard(r.virtualStorage)
if err != nil {
@@ -302,78 +301,99 @@ func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []n
return primary, secondaries, nil
}
+// createReplJob converts `ReplicationEvent` into `ReplJob`.
+// It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`.
+func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) {
+ targetNode, err := r.datastore.GetStorageNode(event.Job.TargetNodeStorage)
+ if err != nil {
+ return datastore.ReplJob{}, err
+ }
+
+ sourceNode, err := r.datastore.GetStorageNode(event.Job.SourceNodeStorage)
+ if err != nil {
+ return datastore.ReplJob{}, err
+ }
+
+ var correlationID string
+ if val, found := event.Meta[metadatahandler.CorrelationIDKey]; found {
+ correlationID, _ = val.(string)
+ }
+
+ replJob := datastore.ReplJob{
+ Attempts: event.Attempt,
+ Change: event.Job.Change,
+ ID: event.ID,
+ TargetNode: targetNode,
+ SourceNode: sourceNode,
+ RelativePath: event.Job.RelativePath,
+ Params: event.Job.Params,
+ CorrelationID: correlationID,
+ }
+
+ return replJob, nil
+}
+
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error {
backoff, reset := b()
for {
- var totalJobs int
+ var totalEvents int
primary, secondaries, err := r.getPrimaryAndSecondaries()
if err == nil {
for _, secondary := range secondaries {
- jobs, err := r.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStateFailed}, secondary.GetStorage(), 10)
+ events, err := r.datastore.Dequeue(ctx, secondary.GetStorage(), 10)
if err != nil {
- return err
+ r.log.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ continue
}
- totalJobs += len(jobs)
+ totalEvents += len(events)
- type replicatedKey struct {
- change datastore.ChangeType
- repoPath, source, target string
- }
- reposReplicated := make(map[replicatedKey]struct{})
-
- for _, job := range jobs {
- if job.Attempts >= maxAttempts {
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to cancelled")
- }
+ eventIDsByState := map[datastore.JobState][]uint64{}
+ for _, event := range events {
+ job, err := r.createReplJob(event)
+ if err != nil {
+ r.log.WithField("event", event).WithError(err).Error("failed to restore replication job")
+ eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
continue
}
-
- var replicationKey replicatedKey
- switch job.Change {
- // this optimization could be done only for Update and Delete replication jobs as we treat them as idempotent
- // Update - there is no much profit from executing multiple fetches for the same target from the same source one by one
- // Delete - there is no way how we could remove already removed repository
- // that is why those Jobs needs to be tracked and marked as Cancelled (removed from queue without execution).
- case datastore.UpdateRepo, datastore.DeleteRepo:
- replicationKey = replicatedKey{
- change: job.Change,
- repoPath: job.RelativePath,
- source: job.SourceNode.Storage,
- target: job.TargetNode.Storage,
- }
-
- if _, ok := reposReplicated[replicationKey]; ok {
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to cancelled")
- }
- continue
- }
- }
-
- if err = r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil {
+ if err := r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil {
r.log.WithFields(logrus.Fields{
- logWithReplJobID: job.ID,
- "from_storage": job.SourceNode.Storage,
- "to_storage": job.TargetNode.Storage,
+ logWithReplJobID: job.ID,
+ logWithReplTarget: job.TargetNode.Storage,
+ logWithReplSource: job.SourceNode.Storage,
+ logWithReplChange: job.Change,
+ logWithReplPath: job.RelativePath,
+ logWithCorrID: job.CorrelationID,
}).WithError(err).Error("replication job failed")
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to failed")
+ if job.Attempts == 0 {
+ eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID)
+ } else {
+ eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
}
continue
}
+ eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID)
+ }
+ for state, eventIDs := range eventIDsByState {
+ ackIDs, err := r.datastore.Acknowledge(ctx, state, eventIDs)
+ if err != nil {
+ r.log.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
+ continue
+ }
- reposReplicated[replicationKey] = struct{}{}
+ notAckIDs := subtractUint64(ackIDs, eventIDs)
+ if len(notAckIDs) > 0 {
+ r.log.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
+ }
}
}
} else {
r.log.WithError(err).WithField("virtual_storage", r.virtualStorage).Error("error when getting primary and secondaries")
}
- if totalJobs == 0 {
+ if totalEvents == 0 {
select {
case <-time.After(backoff()):
continue
@@ -396,18 +416,10 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
WithField(logWithReplJobID, job.ID).
WithField(logWithReplSource, job.SourceNode).
WithField(logWithReplTarget, job.TargetNode).
+ WithField(logWithReplPath, job.RelativePath).
+ WithField(logWithReplChange, job.Change).
WithField(logWithCorrID, job.CorrelationID)
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateInProgress); err != nil {
- l.WithError(err).Error("unable to update replication job to in progress")
- return err
- }
-
- if err := r.datastore.IncrReplJobAttempts(job.ID); err != nil {
- l.WithError(err).Error("unable to increment replication job attempts")
- return err
- }
-
var replCtx context.Context
var cancel func()
@@ -451,9 +463,32 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour
replDuration := time.Since(replStart)
r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second))
- if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCompleted); err != nil {
- r.log.WithError(err).Error("error when updating replication job status to complete")
+ return nil
+}
+
+// subtractUint64 returns new slice that has all elements from left that does not exist at right.
+func subtractUint64(l, r []uint64) []uint64 {
+ if len(l) == 0 {
+ return nil
}
- return nil
+ if len(r) == 0 {
+ result := make([]uint64, len(l))
+ copy(result, l)
+ return result
+ }
+
+ excludeSet := make(map[uint64]struct{}, len(l))
+ for _, v := range r {
+ excludeSet[v] = struct{}{}
+ }
+
+ var result []uint64
+ for _, v := range l {
+ if _, found := excludeSet[v]; !found {
+ result = append(result, v)
+ }
+ }
+
+ return result
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 2dfe83467..3689f868f 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -24,6 +24,7 @@ import (
objectpoolservice "gitlab.com/gitlab-org/gitaly/internal/service/objectpool"
"gitlab.com/gitlab-org/gitaly/internal/service/remote"
"gitlab.com/gitlab-org/gitaly/internal/service/repository"
+ "gitlab.com/gitlab-org/gitaly/internal/service/ssh"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -32,8 +33,6 @@ import (
"google.golang.org/grpc/reflection"
)
-const correlationID = "my-correlation-id"
-
func TestProcessReplicationJob(t *testing.T) {
srv, srvSocketPath := runFullGitalyServer(t)
defer srv.Stop()
@@ -65,7 +64,7 @@ func TestProcessReplicationJob(t *testing.T) {
},
)
- config := config.Config{
+ conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
&config.VirtualStorage{
Name: "default",
@@ -86,7 +85,10 @@ func TestProcessReplicationJob(t *testing.T) {
},
}
- ds := datastore.NewInMemory(config)
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ }
// create object pool on the source
objectPoolPath := testhelper.NewTestObjectPoolName(t)
@@ -117,20 +119,23 @@ func TestProcessReplicationJob(t *testing.T) {
})
require.NoError(t, err)
- primary, err := ds.GetPrimary(config.VirtualStorages[0].Name)
+ primary, err := ds.GetPrimary(conf.VirtualStorages[0].Name)
require.NoError(t, err)
- secondaries, err := ds.GetSecondaries(config.VirtualStorages[0].Name)
+ secondaries, err := ds.GetSecondaries(conf.VirtualStorages[0].Name)
require.NoError(t, err)
- var secondaryStorages []string
+ var jobs []datastore.ReplJob
for _, secondary := range secondaries {
- secondaryStorages = append(secondaryStorages, secondary.Storage)
+ jobs = append(jobs, datastore.ReplJob{
+ Change: datastore.UpdateRepo,
+ TargetNode: secondary,
+ SourceNode: primary,
+ RelativePath: testRepo.GetRelativePath(),
+ State: datastore.JobStateReady,
+ Attempts: 3,
+ CorrelationID: "correlation-id",
+ })
}
- _, err = ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo, nil)
- require.NoError(t, err)
-
- jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStatePending}, backupStorageName, 1)
- require.NoError(t, err)
require.Len(t, jobs, 1)
commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{
@@ -141,7 +146,7 @@ func TestProcessReplicationJob(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
replicator.log = entry
- nodeMgr, err := nodes.NewManager(entry, config, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -151,7 +156,7 @@ func TestProcessReplicationJob(t *testing.T) {
replMgr := NewReplMgr("", testhelper.DiscardTestEntry(t), ds, nodeMgr, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge))
replMgr.replicator = replicator
- shard, err := nodeMgr.GetShard(config.VirtualStorages[0].Name)
+ shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name)
require.NoError(t, err)
primaryNode, err := shard.GetPrimary()
require.NoError(t, err)
@@ -216,7 +221,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
defer primarySvr.Stop()
backupSvr, backupSocket := newReplicationService(t)
- backupSvr.Stop()
+ defer backupSvr.Stop()
internalListener, err := net.Listen("unix", gitaly_config.GitalyInternalSocketPath())
require.NoError(t, err)
@@ -243,7 +248,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
Address: "unix://" + backupSocket,
}
- config := config.Config{
+ conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
Name: "default",
@@ -256,58 +261,99 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
}
ctx, cancel := testhelper.Context()
- defer func(oldStorages []gitaly_config.Storage) {
- gitaly_config.Config.Storages = oldStorages
- cancel()
- }(gitaly_config.Config.Storages)
+ defer cancel()
+
+ defer func(oldStorages []gitaly_config.Storage) { gitaly_config.Config.Storages = oldStorages }(gitaly_config.Config.Storages)
gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
Name: backupStorageName,
Path: backupDir,
- },
- gitaly_config.Storage{
- Name: "default",
- Path: testhelper.GitlabTestStoragePath(),
- },
- )
+ })
+
+ require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one")
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ processed := make(chan struct{})
- ds := datastore.NewInMemory(config)
- ids, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil)
+ dequeues := 0
+ queueInterceptor.OnDequeue(func(ctx context.Context, target string, count int, queue datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
+ events, err := queue.Dequeue(ctx, target, count)
+ if len(events) > 0 {
+ dequeues++
+ }
+ return events, err
+ })
+
+ completedAcks := 0
+ failedAcks := 0
+ deadAcks := 0
+
+ queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
+ switch state {
+ case datastore.JobStateCompleted:
+ require.Equal(t, []uint64{1}, ids)
+ completedAcks++
+ case datastore.JobStateFailed:
+ require.Equal(t, []uint64{2}, ids)
+ failedAcks++
+ case datastore.JobStateDead:
+ require.Equal(t, []uint64{2}, ids)
+ deadAcks++
+ default:
+ require.FailNow(t, "acknowledge is not expected", state)
+ }
+ ackIDs, err := queue.Acknowledge(ctx, state, ids)
+ if completedAcks+failedAcks+deadAcks == 4 {
+ close(processed)
+ }
+ return ackIDs, err
+ })
+
+ ds := datastore.MemoryQueue{
+ ReplicationEventQueue: queueInterceptor,
+ MemoryDatastore: datastore.NewInMemory(conf),
+ }
+
+ // this job exists to verify that replication works
+ okJob := datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ RelativePath: testRepo.RelativePath,
+ TargetNodeStorage: secondary.Storage,
+ SourceNodeStorage: primary.Storage,
+ }
+ event1, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob})
require.NoError(t, err)
- require.Len(t, ids, 1)
+ require.Equal(t, uint64(1), event1.ID)
- entry := testhelper.DiscardTestEntry(t)
+ // this job checks flow for replication event that fails
+ failJob := okJob
+ failJob.RelativePath = "invalid path to fail the job"
+ event2, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob})
+ require.NoError(t, err)
+ require.Equal(t, uint64(2), event2.ID)
- require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady))
+ logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, config, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr("default", entry, ds, nodeMgr)
- replMgr.replJobTimeout = 100 * time.Millisecond
-
- go replMgr.ProcessBacklog(ctx, noopBackoffFunc)
-
- timeLimit := time.NewTimer(5 * time.Second)
- ticker := time.NewTicker(1 * time.Second)
-
- // the job will fail to process because the client connection for "backup" is not registered. It should fail maxAttempts times
- // and get cancelled.
-TestJobGetsCancelled:
- for {
- select {
- case <-ticker.C:
- replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateDead}, "backup", 10)
- require.NoError(t, err)
- if len(replJobs) == 1 {
- //success
- timeLimit.Stop()
- break TestJobGetsCancelled
- }
- case <-timeLimit.C:
- t.Fatal("time limit expired for job to be deemed dead")
- }
+ replMgr := NewReplMgr("default", logEntry, ds, nodeMgr)
+
+ go func() {
+ require.Equal(t, context.Canceled, replMgr.ProcessBacklog(ctx, noopBackoffFunc), "backlog processing failed")
+ }()
+
+ select {
+ case <-processed:
+ case <-time.After(60 * time.Second):
+ // strongly depends on the processing capacity
+ t.Fatal("time limit expired for job to complete")
}
+
+ require.Equal(t, 3, dequeues, "expected 1 deque to get [okJob, failJob] and 2 more for [failJob] only")
+ require.Equal(t, 2, failedAcks)
+ require.Equal(t, 1, deadAcks)
+ require.Equal(t, 1, completedAcks)
}
func TestProcessBacklog_Success(t *testing.T) {
@@ -342,7 +388,7 @@ func TestProcessBacklog_Success(t *testing.T) {
Address: "unix://" + backupSocket,
}
- config := config.Config{
+ conf := config.Config{
VirtualStorages: []*config.VirtualStorage{
{
Name: "default",
@@ -355,36 +401,49 @@ func TestProcessBacklog_Success(t *testing.T) {
}
ctx, cancel := testhelper.Context()
- defer func(oldStorages []gitaly_config.Storage) {
- gitaly_config.Config.Storages = oldStorages
- cancel()
- }(gitaly_config.Config.Storages)
+ defer cancel()
+
+ defer func(oldStorages []gitaly_config.Storage) { gitaly_config.Config.Storages = oldStorages }(gitaly_config.Config.Storages)
gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
Name: backupStorageName,
Path: backupDir,
- },
- gitaly_config.Storage{
- Name: "default",
- Path: testhelper.GitlabTestStoragePath(),
- },
- )
+ })
+ require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one")
+
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
- ds := datastore.NewInMemory(config)
+ processed := make(chan struct{})
+ queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
+ ackIDs, err := queue.Acknowledge(ctx, state, ids)
+ if len(ids) > 0 {
+ require.Equal(t, datastore.JobStateCompleted, state, "no fails expected")
+ require.Equal(t, []uint64{1, 2, 3, 4}, ids, "all jobs must be processed at once")
+ close(processed)
+ }
+ return ackIDs, err
+ })
- var jobIDs []uint64
+ ds := datastore.MemoryQueue{
+ MemoryDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: queueInterceptor,
+ }
// Update replication job
- idsUpdate1, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil)
+ eventType1 := datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ RelativePath: testRepo.GetRelativePath(),
+ TargetNodeStorage: secondary.Storage,
+ SourceNodeStorage: primary.Storage,
+ },
+ }
+
+ _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1)
require.NoError(t, err)
- require.Len(t, idsUpdate1, 1)
- jobIDs = append(jobIDs, idsUpdate1...)
- // Update replication job
- idsUpdate2, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil)
+ _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1)
require.NoError(t, err)
- require.Len(t, idsUpdate2, 1)
- jobIDs = append(jobIDs, idsUpdate2...)
renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1")
fullNewPath1 := filepath.Join(backupDir, renameTo1)
@@ -393,50 +452,50 @@ func TestProcessBacklog_Success(t *testing.T) {
fullNewPath2 := filepath.Join(backupDir, renameTo2)
// Rename replication job
- idsRename1, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.RenameRepo, datastore.Params{"RelativePath": renameTo1})
+ eventType2 := datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.RenameRepo,
+ RelativePath: testRepo.GetRelativePath(),
+ TargetNodeStorage: secondary.Storage,
+ SourceNodeStorage: primary.Storage,
+ Params: datastore.Params{"RelativePath": renameTo1},
+ },
+ }
+
+ _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType2)
require.NoError(t, err)
- require.Len(t, idsRename1, 1)
- jobIDs = append(jobIDs, idsRename1...)
// Rename replication job
- idsRename2, err := ds.CreateReplicaReplJobs(correlationID, renameTo1, primary.Storage, []string{secondary.Storage}, datastore.RenameRepo, datastore.Params{"RelativePath": renameTo2})
+ eventType3 := datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.RenameRepo,
+ RelativePath: renameTo1,
+ TargetNodeStorage: secondary.Storage,
+ SourceNodeStorage: primary.Storage,
+ Params: datastore.Params{"RelativePath": renameTo2},
+ },
+ }
require.NoError(t, err)
- require.Len(t, idsRename2, 1)
- jobIDs = append(jobIDs, idsRename2...)
- entry := testhelper.DiscardTestEntry(t)
+ _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType3)
+ require.NoError(t, err)
- for _, id := range jobIDs {
- require.NoError(t, ds.UpdateReplJobState(id, datastore.JobStateReady))
- }
+ logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, config, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec())
require.NoError(t, err)
- replMgr := NewReplMgr("default", entry, ds, nodeMgr)
- replMgr.replJobTimeout = 5 * time.Second
+ replMgr := NewReplMgr("default", logEntry, ds, nodeMgr)
go func() {
require.Equal(t, context.Canceled, replMgr.ProcessBacklog(ctx, noopBackoffFunc), "backlog processing failed")
}()
- timeLimit := time.NewTimer(5 * time.Second)
- ticker := time.NewTicker(1 * time.Second)
-
- // Once the listener is being served, and we try the job again it should succeed
-TestJobSucceeds:
- for {
- select {
- case <-ticker.C:
- replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateFailed, datastore.JobStateInProgress, datastore.JobStateReady, datastore.JobStateDead}, "backup", 10)
- require.NoError(t, err)
- if len(replJobs) == 0 {
- //success
- break TestJobSucceeds
- }
- case <-timeLimit.C:
- t.Fatal("time limit expired for job to complete")
- }
+ select {
+ case <-processed:
+ case <-time.After(60 * time.Second):
+ // strongly depends on the processing capacity
+ t.Fatal("time limit expired for job to complete")
}
_, serr := os.Stat(fullNewPath1)
@@ -483,7 +542,7 @@ func runFullGitalyServer(t *testing.T) (*grpc.Server, string) {
return server, "unix://" + serverSocketPath
}
-// newReplicationService is a grpc service that has the Repository, Remote and ObjectPool services, which
+// newReplicationService is a grpc service that has the SSH, Repository, Remote and ObjectPool services, which
// are the only ones needed for replication
func newReplicationService(tb testing.TB) (*grpc.Server, string) {
socketName := testhelper.GetTemporaryGitalySocketFileName()
@@ -493,6 +552,7 @@ func newReplicationService(tb testing.TB) (*grpc.Server, string) {
gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(&rubyserver.Server{}, gitaly_config.GitalyInternalSocketPath()))
gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer())
gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(&rubyserver.Server{}))
+ gitalypb.RegisterSSHServiceServer(svr, ssh.NewServer())
reflection.Register(svr)
listener, err := net.Listen("unix", socketName)
@@ -542,3 +602,24 @@ func testMain(m *testing.M) int {
return m.Run()
}
+
+func TestSubtractUint64(t *testing.T) {
+ testCases := []struct {
+ desc string
+ left []uint64
+ right []uint64
+ exp []uint64
+ }{
+ {desc: "empty left", left: nil, right: []uint64{1, 2}, exp: nil},
+ {desc: "empty right", left: []uint64{1, 2}, right: []uint64{}, exp: []uint64{1, 2}},
+ {desc: "some exists", left: []uint64{1, 2, 3, 4, 5}, right: []uint64{2, 4, 5}, exp: []uint64{1, 3}},
+ {desc: "nothing exists", left: []uint64{10, 20}, right: []uint64{100, 200}, exp: []uint64{10, 20}},
+ {desc: "duplicates exists", left: []uint64{1, 1, 2, 3, 3, 4, 4, 5}, right: []uint64{3, 4, 4, 5}, exp: []uint64{1, 1, 2}},
+ }
+
+ for _, testCase := range testCases {
+ t.Run(testCase.desc, func(t *testing.T) {
+ require.Equal(t, testCase.exp, subtractUint64(testCase.left, testCase.right))
+ })
+ }
+}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 1cdee452a..3c3e0c9f7 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -139,7 +139,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
registry := protoregistry.New()
require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
- _, srv := setupServer(t, conf, nodeMgr, entry, registry)
+ srv := setupServer(t, conf, nodeMgr, entry, registry)
listener, port := listenAvailPort(t)
go func() {
diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go
index 64e3d2cfb..5f5b97fb7 100644
--- a/internal/praefect/service/info/consistencycheck.go
+++ b/internal/praefect/service/info/consistencycheck.go
@@ -4,6 +4,7 @@ import (
"context"
"io"
+ "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -113,8 +114,6 @@ func walkRepos(ctx context.Context, walkerQ chan<- string, reference nodes.Node)
}
walkerQ <- resp.GetRelativePath()
}
-
- return nil
}
func checksumRepo(ctx context.Context, relpath string, node nodes.Node) (string, error) {
@@ -177,36 +176,27 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ
return nil
}
-func scheduleReplication(ctx context.Context, csr checksumResult, ds Datastore, resp *gitalypb.ConsistencyCheckResponse) error {
- ids, err := ds.CreateReplicaReplJobs(
- correlation.ExtractFromContext(ctx),
- csr.relativePath,
- csr.referenceStorage,
- []string{csr.targetStorage},
- datastore.UpdateRepo,
- nil,
- )
+func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp *gitalypb.ConsistencyCheckResponse) error {
+ event, err := q.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ Change: datastore.UpdateRepo,
+ RelativePath: csr.relativePath,
+ TargetNodeStorage: csr.targetStorage,
+ SourceNodeStorage: csr.referenceStorage,
+ },
+ Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlation.ExtractFromContext(ctx)},
+ })
+
if err != nil {
return err
}
- if len(ids) != 1 {
- return status.Errorf(
- codes.Internal,
- "datastore unexpectedly returned %d job IDs",
- len(ids),
- )
- }
- resp.ReplJobId = ids[0]
-
- if err := ds.UpdateReplJobState(resp.ReplJobId, datastore.JobStateReady); err != nil {
- return err
- }
+ resp.ReplJobId = event.ID
return nil
}
-func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResult, ds Datastore, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
+func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResult, q Queue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error {
for csr := range checksumResultQ {
select {
case <-ctx.Done():
@@ -222,7 +212,7 @@ func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResul
}
if csr.reference != csr.target {
- if err := scheduleReplication(ctx, csr, ds, resp); err != nil {
+ if err := scheduleReplication(ctx, csr, q, resp); err != nil {
return err
}
}
@@ -260,7 +250,7 @@ func (s *Server) ConsistencyCheck(req *gitalypb.ConsistencyCheckRequest, stream
return checksumRepos(ctx, walkerQ, checksumResultQ, target, reference)
})
g.Go(func() error {
- return ensureConsistency(ctx, checksumResultQ, s.datastore, stream)
+ return ensureConsistency(ctx, checksumResultQ, s.queue, stream)
})
return g.Wait()
diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go
index 31990a813..46702d80a 100644
--- a/internal/praefect/service/info/server.go
+++ b/internal/praefect/service/info/server.go
@@ -1,36 +1,37 @@
package info
import (
+ "context"
+
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
-// Datastore is a subset of the datastore functionality needed by this service
-type Datastore interface {
- CreateReplicaReplJobs(correlationID, relativePath, primaryStorage string, secondaryStorages []string, change datastore.ChangeType, params datastore.Params) ([]uint64, error)
- UpdateReplJobState(jobID uint64, newState datastore.JobState) error
+// Queue is a subset of the datastore.ReplicationEventQueue functionality needed by this service
+type Queue interface {
+ Enqueue(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error)
}
-// compile time assertion that Datastore is satisfied by
-// datastore.ReplJobsDatastore
-var _ Datastore = (datastore.ReplJobsDatastore)(nil)
+// compile time assertion that Queue is satisfied by
+// datastore.ReplicationEventQueue
+var _ Queue = (datastore.ReplicationEventQueue)(nil)
// Server is a InfoService server
type Server struct {
gitalypb.UnimplementedPraefectInfoServiceServer
- nodeMgr nodes.Manager
- conf config.Config
- datastore Datastore
+ nodeMgr nodes.Manager
+ conf config.Config
+ queue Queue
}
// NewServer creates a new instance of a grpc InfoServiceServer
-func NewServer(nodeMgr nodes.Manager, conf config.Config, datastore Datastore) gitalypb.PraefectInfoServiceServer {
+func NewServer(nodeMgr nodes.Manager, conf config.Config, queue Queue) gitalypb.PraefectInfoServiceServer {
s := &Server{
- nodeMgr: nodeMgr,
- conf: conf,
- datastore: datastore,
+ nodeMgr: nodeMgr,
+ conf: conf,
+ queue: queue,
}
return s