Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-02 16:53:30 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-02 16:53:30 +0300
commita5341cd00664f011d3365cdeba462f04d6e214ed (patch)
tree6b9eebc1f0ca8e9acd8d3a679a4c803ef671649c
parentc3948afe4501a7b2f94d553063c3c76dc6bc11f0 (diff)
parent172e2a0a922edc4168aadb398b71cf20161ba3c3 (diff)
Merge branch 'ps-replication-health-ping' into 'master'
Praefect: replication jobs health ping See merge request gitlab-org/gitaly!2321
-rw-r--r--changelogs/unreleased/ps-replication-health-ping.yml5
-rw-r--r--internal/praefect/datastore/glsql/postgres.go1
-rw-r--r--internal/praefect/datastore/memory.go26
-rw-r--r--internal/praefect/datastore/queue.go53
-rw-r--r--internal/praefect/datastore/queue_test.go153
-rw-r--r--internal/praefect/replicator.go121
-rw-r--r--internal/praefect/replicator_test.go9
7 files changed, 316 insertions, 52 deletions
diff --git a/changelogs/unreleased/ps-replication-health-ping.yml b/changelogs/unreleased/ps-replication-health-ping.yml
new file mode 100644
index 000000000..a03ebeee1
--- /dev/null
+++ b/changelogs/unreleased/ps-replication-health-ping.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: replication jobs health ping'
+merge_request: 2321
+author:
+type: added
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 6e3346a24..2d3f36150 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -40,6 +40,7 @@ func Migrate(db *sql.DB, ignoreUnknown bool) (int, error) {
type Querier interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+ ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// TxQuery runs operations inside transaction and commits|rollbacks on Done.
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 7ad900b80..ed122e21a 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -219,6 +219,12 @@ func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, vir
return result, nil
}
+// StartHealthUpdate does nothing as it has no sense in terms of in-memory implementation as
+// all information about events will be lost after restart.
+func (s *memoryReplicationEventQueue) StartHealthUpdate(context.Context, <-chan time.Time, []ReplicationEvent) error {
+ return nil
+}
+
// remove deletes i-th element from the queue and from the in-flight tracking map.
// It doesn't check 'i' for the out of range and must be called with lock protection.
func (s *memoryReplicationEventQueue) remove(i int) {
@@ -240,6 +246,8 @@ type ReplicationEventQueueInterceptor interface {
OnDequeue(func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error))
// OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called.
OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error))
+ // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called.
+ OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error)
}
// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
@@ -249,9 +257,10 @@ func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) Replicatio
type replicationEventQueueInterceptor struct {
ReplicationEventQueue
- onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)
- onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)
- onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)
+ onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)
+ onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)
+ onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)
+ onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error
}
func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) {
@@ -266,6 +275,10 @@ func (i *replicationEventQueueInterceptor) OnAcknowledge(action func(context.Con
i.onAcknowledge = action
}
+func (i *replicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) {
+ i.onStartHealthUpdate = action
+}
+
func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
if i.onEnqueue != nil {
return i.onEnqueue(ctx, event, i.ReplicationEventQueue)
@@ -286,3 +299,10 @@ func (i *replicationEventQueueInterceptor) Acknowledge(ctx context.Context, stat
}
return i.ReplicationEventQueue.Acknowledge(ctx, state, ids)
}
+
+func (i *replicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error {
+ if i.onStartHealthUpdate != nil {
+ return i.onStartHealthUpdate(ctx, trigger, events)
+ }
+ return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 7edc4b006..516573f26 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -5,9 +5,11 @@ import (
"database/sql"
"database/sql/driver"
"encoding/json"
+ "errors"
"fmt"
"time"
+ "github.com/lib/pq"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
@@ -28,6 +30,11 @@ type ReplicationEventQueue interface {
// GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state.
// It returns no results if there is no up to date storages or there were no replication events yet.
GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error)
+ // StartHealthUpdate starts periodical update of the event's health identifier.
+ // The events with fresh health identifier won't be considered as stale.
+ // The health update will be executed on each new entry received from trigger channel passed in.
+ // It is a blocking call that is managed by the passed in context.
+ StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
}
func allowToAck(state JobState) error {
@@ -374,3 +381,49 @@ func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context,
return storages.Values(), nil
}
+
+// StartHealthUpdate starts periodical update of the event's health identifier.
+// The events with fresh health identifier won't be considered as stale.
+// The health update will be executed on each new entry received from trigger channel passed in.
+// It is a blocking call that is managed by the passed in context.
+func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error {
+ if len(events) == 0 {
+ return nil
+ }
+
+ jobIDs := make(pq.Int64Array, len(events))
+ lockIDs := make(pq.StringArray, len(events))
+ for i := range events {
+ jobIDs[i] = int64(events[i].ID)
+ lockIDs[i] = events[i].LockID
+ }
+
+ query := `
+ UPDATE replication_queue_job_lock
+ SET triggered_at = NOW() AT TIME ZONE 'UTC'
+ WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))`
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-trigger:
+ res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs)
+ if err != nil {
+ if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
+ return err
+ }
+ return nil
+ }
+
+ affected, err := res.RowsAffected()
+ if err != nil {
+ return err
+ }
+
+ if affected == 0 {
+ return nil
+ }
+ }
+ }
+}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 9f4c458d1..f63049b29 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"time"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -853,6 +854,138 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
})
}
+func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
+ db := getDB(t)
+
+ eventType1 := ReplicationEvent{Job: ReplicationJob{
+ Change: UpdateRepo,
+ VirtualStorage: "vs-1",
+ TargetNodeStorage: "s-1",
+ SourceNodeStorage: "s-0",
+ RelativePath: "/path/1",
+ }}
+
+ eventType2 := eventType1
+ eventType2.Job.RelativePath = "/path/2"
+
+ eventType3 := eventType1
+ eventType3.Job.VirtualStorage = "vs-2"
+
+ eventType4 := eventType1
+ eventType4.Job.TargetNodeStorage = "s-2"
+
+ t.Run("no events is valid", func(t *testing.T) {
+ // 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation
+ queue := PostgresReplicationEventQueue{}
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ require.NoError(t, queue.StartHealthUpdate(ctx, nil, nil))
+ })
+
+ t.Run("can be terminated by the passed in context", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ // 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation
+ queue := PostgresReplicationEventQueue{}
+ cancel()
+ require.NoError(t, queue.StartHealthUpdate(ctx, nil, []ReplicationEvent{eventType1}))
+ })
+
+ t.Run("stops after first error", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ qc, err := db.DB.BeginTx(ctx, nil)
+ require.NoError(t, err)
+ require.NoError(t, qc.Rollback())
+
+ // 'qc' is initialized with invalid connection (transaction is finished), so operations on it will fail
+ queue := PostgresReplicationEventQueue{qc: qc}
+
+ trigger := make(chan time.Time, 1)
+ trigger <- time.Time{}
+
+ require.Error(t, queue.StartHealthUpdate(ctx, trigger, []ReplicationEvent{eventType1}))
+ })
+
+ t.Run("stops if nothing to update (extended coverage)", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ done := make(chan struct{})
+ queue := PostgresReplicationEventQueue{qc: db}
+ go func() {
+ trigger := make(chan time.Time)
+ close(trigger)
+
+ defer close(done)
+ assert.NoError(t, queue.StartHealthUpdate(ctx, trigger, []ReplicationEvent{eventType1}))
+ }()
+
+ select {
+ case <-done:
+ return // happy path
+ case <-time.After(time.Second):
+ require.FailNow(t, "method should return almost immediately as there is nothing to process")
+ }
+ })
+
+ t.Run("triggers all passed in events", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{qc: db}
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
+ for i := range events {
+ var err error
+ events[i], err = queue.Enqueue(ctx, events[i])
+ require.NoError(t, err, "failed to fill in event queue")
+ }
+
+ dequeuedEventsToTrigger, err := queue.Dequeue(ctx, eventType1.Job.VirtualStorage, eventType1.Job.TargetNodeStorage, 10)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEventsToTrigger, 2, "eventType3 and eventType4 should not be fetched")
+ ids := []uint64{dequeuedEventsToTrigger[0].ID, dequeuedEventsToTrigger[1].ID}
+
+ dequeuedEventsUntriggered, err := queue.Dequeue(ctx, eventType3.Job.VirtualStorage, eventType3.Job.TargetNodeStorage, 10)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEventsUntriggered, 1, "only eventType3 should be fetched")
+
+ initialJobLocks := fetchJobLocks(t, ctx, db)
+
+ trigger := make(chan time.Time, 1)
+ go func() {
+ trigger <- time.Time{}
+ assert.NoError(t, queue.StartHealthUpdate(ctx, trigger, dequeuedEventsToTrigger))
+ }()
+
+ time.Sleep(time.Millisecond) // we should sleep as the processing is too fast and won't give different time
+ trigger <- time.Time{} // once this consumed we are sure that the previous update has been executed
+
+ updatedJobLocks := fetchJobLocks(t, ctx, db)
+ for i := range initialJobLocks {
+ if updatedJobLocks[i].JobID == dequeuedEventsUntriggered[0].ID {
+ require.Equal(t, initialJobLocks[i].TriggeredAt, updatedJobLocks[i].TriggeredAt, "no update expected as it was not submitted")
+ } else {
+ require.True(t, updatedJobLocks[i].TriggeredAt.After(initialJobLocks[i].TriggeredAt))
+ }
+ }
+
+ ackIDs, err := queue.Acknowledge(ctx, JobStateFailed, ids)
+ require.NoError(t, err)
+ require.ElementsMatch(t, ackIDs, ids)
+
+ require.Len(t, fetchJobLocks(t, ctx, db), 1, "bindings should be removed after acknowledgment")
+ })
+}
+
func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
t.Helper()
@@ -909,17 +1042,27 @@ type JobLockRow struct {
func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) {
t.Helper()
- sqlStmt := `SELECT job_id, lock_id FROM replication_queue_job_lock ORDER BY triggered_at`
+ actual := fetchJobLocks(t, ctx, db)
+ for i := range actual {
+ actual[i].TriggeredAt = time.Time{}
+ }
+ require.ElementsMatch(t, expected, actual)
+}
+
+func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow {
+ t.Helper()
+ sqlStmt := `SELECT job_id, lock_id, triggered_at FROM replication_queue_job_lock ORDER BY job_id`
rows, err := db.QueryContext(ctx, sqlStmt)
require.NoError(t, err)
defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
- var actual []JobLockRow
+ var entries []JobLockRow
for rows.Next() {
var entry JobLockRow
- require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID), "failed to scan entry")
- actual = append(actual, entry)
+ require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID, &entry.TriggeredAt), "failed to scan entry")
+ entries = append(entries, entry)
}
require.NoError(t, rows.Err(), "completion of result loop scan")
- require.ElementsMatch(t, expected, actual)
+
+ return entries
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index d6354161c..a4d2a54da 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -396,57 +396,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
var totalEvents int
shard, err := r.nodeManager.GetShard(virtualStorage)
- if err == nil {
+ if err != nil {
+ logger.WithError(err).Error("error when getting primary and secondaries")
+ } else {
targetNodes := shard.Secondaries
if shard.IsReadOnly {
targetNodes = append(targetNodes, shard.Primary)
}
for _, target := range targetNodes {
- events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
- if err != nil {
- logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
- continue
- }
-
- totalEvents += len(events)
-
- eventIDsByState := map[datastore.JobState][]uint64{}
- for _, event := range events {
- if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil {
- logger.WithFields(logrus.Fields{
- logWithReplJobID: event.ID,
- logWithReplVirtual: event.Job.VirtualStorage,
- logWithReplTarget: event.Job.TargetNodeStorage,
- logWithReplSource: event.Job.SourceNodeStorage,
- logWithReplChange: event.Job.Change,
- logWithReplPath: event.Job.RelativePath,
- logWithCorrID: getCorrelationID(event.Meta),
- }).WithError(err).Error("replication job failed")
- if event.Attempt <= 0 {
- eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID)
- } else {
- eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
- }
- continue
- }
- eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID)
- }
- for state, eventIDs := range eventIDsByState {
- ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
- if err != nil {
- logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
- continue
- }
-
- notAckIDs := subtractUint64(ackIDs, eventIDs)
- if len(notAckIDs) > 0 {
- logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
- }
- }
+ totalEvents += r.handleNode(ctx, logger, shard, virtualStorage, target)
}
- } else {
- logger.WithError(err).Error("error when getting primary and secondaries")
}
if totalEvents == 0 {
@@ -463,6 +423,79 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
+func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int {
+ events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
+ if err != nil {
+ logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ return 0
+ }
+
+ if len(events) == 0 {
+ return 0
+ }
+
+ stopHealthUpdate := r.startHealthUpdate(ctx, logger, events)
+ defer stopHealthUpdate()
+
+ eventIDsByState := map[datastore.JobState][]uint64{}
+ for _, event := range events {
+ state := r.handleNodeEvent(ctx, logger, shard, target, event)
+ eventIDsByState[state] = append(eventIDsByState[state], event.ID)
+ }
+
+ for state, eventIDs := range eventIDsByState {
+ ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
+ if err != nil {
+ logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
+ continue
+ }
+
+ notAckIDs := subtractUint64(ackIDs, eventIDs)
+ if len(notAckIDs) > 0 {
+ logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
+ }
+ }
+
+ return len(events)
+}
+
+func (r ReplMgr) startHealthUpdate(ctx context.Context, logger logrus.FieldLogger, events []datastore.ReplicationEvent) context.CancelFunc {
+ healthUpdateCtx, healthUpdateCancel := context.WithCancel(ctx)
+ go func() {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+
+ if err := r.queue.StartHealthUpdate(healthUpdateCtx, ticker.C, events); err != nil {
+ ids := make([]uint64, len(events))
+ for i, event := range events {
+ ids[i] = event.ID
+ }
+
+ logger.WithField("event_ids", ids).WithError(err).Error("health update loop")
+ }
+ }()
+
+ return healthUpdateCancel
+}
+
+func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, target nodes.Node, event datastore.ReplicationEvent) datastore.JobState {
+ err := r.processReplicationEvent(ctx, event, shard, target.GetConnection())
+ if err == nil {
+ return datastore.JobStateCompleted
+ }
+
+ logger.WithFields(logrus.Fields{
+ logWithReplJobID: event.ID,
+ logWithCorrID: getCorrelationID(event.Meta),
+ }).WithError(err).Error("replication job failed")
+
+ if event.Attempt <= 0 {
+ return datastore.JobStateDead
+ }
+
+ return datastore.JobStateFailed
+}
+
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
source, err := shard.GetNode(event.Job.SourceNodeStorage)
if err != nil {
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 2ae4dd1dc..93954bd3a 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -7,6 +7,7 @@ import (
"net"
"os"
"path/filepath"
+ "sync/atomic"
"testing"
"time"
@@ -572,6 +573,13 @@ func TestProcessBacklog_Success(t *testing.T) {
return ackIDs, err
})
+ var healthUpdated int32
+ queueInterceptor.OnStartHealthUpdate(func(ctx context.Context, trigger <-chan time.Time, events []datastore.ReplicationEvent) error {
+ require.Len(t, events, 4)
+ atomic.AddInt32(&healthUpdated, 1)
+ return nil
+ })
+
// Update replication job
eventType1 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
@@ -636,6 +644,7 @@ func TestProcessBacklog_Success(t *testing.T) {
select {
case <-processed:
+ require.EqualValues(t, 1, atomic.LoadInt32(&healthUpdated), "health update should be called")
case <-time.After(30 * time.Second):
// strongly depends on the processing capacity
t.Fatal("time limit expired for job to complete")