Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-07-02 16:53:30 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-02 16:53:30 +0300
commit172e2a0a922edc4168aadb398b71cf20161ba3c3 (patch)
tree5940084943cec648770adc5806d9ef06f3935339
parent715cfcc7c66040149447ecdfb2400d0494c88b47 (diff)
Praefect: replication jobs health ping
The replication process could take unpredictable amount of time. And to distinct between stale jobs (the process is finished, but the entry was not updated) and time-consuming jobs the health-ping was introduced. It updates timestamp of the event in the database so it can be used to identify if replication process is going on for this event. Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2873
-rw-r--r--changelogs/unreleased/ps-replication-health-ping.yml5
-rw-r--r--internal/praefect/datastore/glsql/postgres.go1
-rw-r--r--internal/praefect/datastore/memory.go26
-rw-r--r--internal/praefect/datastore/queue.go53
-rw-r--r--internal/praefect/datastore/queue_test.go153
-rw-r--r--internal/praefect/replicator.go121
-rw-r--r--internal/praefect/replicator_test.go9
7 files changed, 316 insertions, 52 deletions
diff --git a/changelogs/unreleased/ps-replication-health-ping.yml b/changelogs/unreleased/ps-replication-health-ping.yml
new file mode 100644
index 000000000..a03ebeee1
--- /dev/null
+++ b/changelogs/unreleased/ps-replication-health-ping.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: replication jobs health ping'
+merge_request: 2321
+author:
+type: added
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index 6e3346a24..2d3f36150 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -40,6 +40,7 @@ func Migrate(db *sql.DB, ignoreUnknown bool) (int, error) {
type Querier interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
+ ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
// TxQuery runs operations inside transaction and commits|rollbacks on Done.
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 7ad900b80..ed122e21a 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -219,6 +219,12 @@ func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, vir
return result, nil
}
+// StartHealthUpdate does nothing as it has no sense in terms of in-memory implementation as
+// all information about events will be lost after restart.
+func (s *memoryReplicationEventQueue) StartHealthUpdate(context.Context, <-chan time.Time, []ReplicationEvent) error {
+ return nil
+}
+
// remove deletes i-th element from the queue and from the in-flight tracking map.
// It doesn't check 'i' for the out of range and must be called with lock protection.
func (s *memoryReplicationEventQueue) remove(i int) {
@@ -240,6 +246,8 @@ type ReplicationEventQueueInterceptor interface {
OnDequeue(func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error))
// OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called.
OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error))
+ // OnStartHealthUpdate allows to set action that would be executed each time when `StartHealthUpdate` method called.
+ OnStartHealthUpdate(func(context.Context, <-chan time.Time, []ReplicationEvent) error)
}
// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface.
@@ -249,9 +257,10 @@ func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) Replicatio
type replicationEventQueueInterceptor struct {
ReplicationEventQueue
- onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)
- onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)
- onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)
+ onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)
+ onDequeue func(context.Context, string, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)
+ onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)
+ onStartHealthUpdate func(context.Context, <-chan time.Time, []ReplicationEvent) error
}
func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) {
@@ -266,6 +275,10 @@ func (i *replicationEventQueueInterceptor) OnAcknowledge(action func(context.Con
i.onAcknowledge = action
}
+func (i *replicationEventQueueInterceptor) OnStartHealthUpdate(action func(context.Context, <-chan time.Time, []ReplicationEvent) error) {
+ i.onStartHealthUpdate = action
+}
+
func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
if i.onEnqueue != nil {
return i.onEnqueue(ctx, event, i.ReplicationEventQueue)
@@ -286,3 +299,10 @@ func (i *replicationEventQueueInterceptor) Acknowledge(ctx context.Context, stat
}
return i.ReplicationEventQueue.Acknowledge(ctx, state, ids)
}
+
+func (i *replicationEventQueueInterceptor) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error {
+ if i.onStartHealthUpdate != nil {
+ return i.onStartHealthUpdate(ctx, trigger, events)
+ }
+ return i.ReplicationEventQueue.StartHealthUpdate(ctx, trigger, events)
+}
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index 7edc4b006..516573f26 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -5,9 +5,11 @@ import (
"database/sql"
"database/sql/driver"
"encoding/json"
+ "errors"
"fmt"
"time"
+ "github.com/lib/pq"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
)
@@ -28,6 +30,11 @@ type ReplicationEventQueue interface {
// GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state.
// It returns no results if there is no up to date storages or there were no replication events yet.
GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error)
+ // StartHealthUpdate starts periodical update of the event's health identifier.
+ // The events with fresh health identifier won't be considered as stale.
+ // The health update will be executed on each new entry received from trigger channel passed in.
+ // It is a blocking call that is managed by the passed in context.
+ StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
}
func allowToAck(state JobState) error {
@@ -374,3 +381,49 @@ func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context,
return storages.Values(), nil
}
+
+// StartHealthUpdate starts periodical update of the event's health identifier.
+// The events with fresh health identifier won't be considered as stale.
+// The health update will be executed on each new entry received from trigger channel passed in.
+// It is a blocking call that is managed by the passed in context.
+func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error {
+ if len(events) == 0 {
+ return nil
+ }
+
+ jobIDs := make(pq.Int64Array, len(events))
+ lockIDs := make(pq.StringArray, len(events))
+ for i := range events {
+ jobIDs[i] = int64(events[i].ID)
+ lockIDs[i] = events[i].LockID
+ }
+
+ query := `
+ UPDATE replication_queue_job_lock
+ SET triggered_at = NOW() AT TIME ZONE 'UTC'
+ WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))`
+
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-trigger:
+ res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs)
+ if err != nil {
+ if !(errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
+ return err
+ }
+ return nil
+ }
+
+ affected, err := res.RowsAffected()
+ if err != nil {
+ return err
+ }
+
+ if affected == 0 {
+ return nil
+ }
+ }
+ }
+}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 9f4c458d1..f63049b29 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"time"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -853,6 +854,138 @@ func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
})
}
+func TestPostgresReplicationEventQueue_StartHealthUpdate(t *testing.T) {
+ db := getDB(t)
+
+ eventType1 := ReplicationEvent{Job: ReplicationJob{
+ Change: UpdateRepo,
+ VirtualStorage: "vs-1",
+ TargetNodeStorage: "s-1",
+ SourceNodeStorage: "s-0",
+ RelativePath: "/path/1",
+ }}
+
+ eventType2 := eventType1
+ eventType2.Job.RelativePath = "/path/2"
+
+ eventType3 := eventType1
+ eventType3.Job.VirtualStorage = "vs-2"
+
+ eventType4 := eventType1
+ eventType4.Job.TargetNodeStorage = "s-2"
+
+ t.Run("no events is valid", func(t *testing.T) {
+ // 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation
+ queue := PostgresReplicationEventQueue{}
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ require.NoError(t, queue.StartHealthUpdate(ctx, nil, nil))
+ })
+
+ t.Run("can be terminated by the passed in context", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ // 'qc' is not initialized, so the test will fail if there will be an attempt to make SQL operation
+ queue := PostgresReplicationEventQueue{}
+ cancel()
+ require.NoError(t, queue.StartHealthUpdate(ctx, nil, []ReplicationEvent{eventType1}))
+ })
+
+ t.Run("stops after first error", func(t *testing.T) {
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ qc, err := db.DB.BeginTx(ctx, nil)
+ require.NoError(t, err)
+ require.NoError(t, qc.Rollback())
+
+ // 'qc' is initialized with invalid connection (transaction is finished), so operations on it will fail
+ queue := PostgresReplicationEventQueue{qc: qc}
+
+ trigger := make(chan time.Time, 1)
+ trigger <- time.Time{}
+
+ require.Error(t, queue.StartHealthUpdate(ctx, trigger, []ReplicationEvent{eventType1}))
+ })
+
+ t.Run("stops if nothing to update (extended coverage)", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ done := make(chan struct{})
+ queue := PostgresReplicationEventQueue{qc: db}
+ go func() {
+ trigger := make(chan time.Time)
+ close(trigger)
+
+ defer close(done)
+ assert.NoError(t, queue.StartHealthUpdate(ctx, trigger, []ReplicationEvent{eventType1}))
+ }()
+
+ select {
+ case <-done:
+ return // happy path
+ case <-time.After(time.Second):
+ require.FailNow(t, "method should return almost immediately as there is nothing to process")
+ }
+ })
+
+ t.Run("triggers all passed in events", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ queue := PostgresReplicationEventQueue{qc: db}
+ events := []ReplicationEvent{eventType1, eventType2, eventType3, eventType4}
+ for i := range events {
+ var err error
+ events[i], err = queue.Enqueue(ctx, events[i])
+ require.NoError(t, err, "failed to fill in event queue")
+ }
+
+ dequeuedEventsToTrigger, err := queue.Dequeue(ctx, eventType1.Job.VirtualStorage, eventType1.Job.TargetNodeStorage, 10)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEventsToTrigger, 2, "eventType3 and eventType4 should not be fetched")
+ ids := []uint64{dequeuedEventsToTrigger[0].ID, dequeuedEventsToTrigger[1].ID}
+
+ dequeuedEventsUntriggered, err := queue.Dequeue(ctx, eventType3.Job.VirtualStorage, eventType3.Job.TargetNodeStorage, 10)
+ require.NoError(t, err)
+ require.Len(t, dequeuedEventsUntriggered, 1, "only eventType3 should be fetched")
+
+ initialJobLocks := fetchJobLocks(t, ctx, db)
+
+ trigger := make(chan time.Time, 1)
+ go func() {
+ trigger <- time.Time{}
+ assert.NoError(t, queue.StartHealthUpdate(ctx, trigger, dequeuedEventsToTrigger))
+ }()
+
+ time.Sleep(time.Millisecond) // we should sleep as the processing is too fast and won't give different time
+ trigger <- time.Time{} // once this consumed we are sure that the previous update has been executed
+
+ updatedJobLocks := fetchJobLocks(t, ctx, db)
+ for i := range initialJobLocks {
+ if updatedJobLocks[i].JobID == dequeuedEventsUntriggered[0].ID {
+ require.Equal(t, initialJobLocks[i].TriggeredAt, updatedJobLocks[i].TriggeredAt, "no update expected as it was not submitted")
+ } else {
+ require.True(t, updatedJobLocks[i].TriggeredAt.After(initialJobLocks[i].TriggeredAt))
+ }
+ }
+
+ ackIDs, err := queue.Acknowledge(ctx, JobStateFailed, ids)
+ require.NoError(t, err)
+ require.ElementsMatch(t, ackIDs, ids)
+
+ require.Len(t, fetchJobLocks(t, ctx, db), 1, "bindings should be removed after acknowledgment")
+ })
+}
+
func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
t.Helper()
@@ -909,17 +1042,27 @@ type JobLockRow struct {
func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) {
t.Helper()
- sqlStmt := `SELECT job_id, lock_id FROM replication_queue_job_lock ORDER BY triggered_at`
+ actual := fetchJobLocks(t, ctx, db)
+ for i := range actual {
+ actual[i].TriggeredAt = time.Time{}
+ }
+ require.ElementsMatch(t, expected, actual)
+}
+
+func fetchJobLocks(t *testing.T, ctx context.Context, db glsql.DB) []JobLockRow {
+ t.Helper()
+ sqlStmt := `SELECT job_id, lock_id, triggered_at FROM replication_queue_job_lock ORDER BY job_id`
rows, err := db.QueryContext(ctx, sqlStmt)
require.NoError(t, err)
defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }()
- var actual []JobLockRow
+ var entries []JobLockRow
for rows.Next() {
var entry JobLockRow
- require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID), "failed to scan entry")
- actual = append(actual, entry)
+ require.NoError(t, rows.Scan(&entry.JobID, &entry.LockID, &entry.TriggeredAt), "failed to scan entry")
+ entries = append(entries, entry)
}
require.NoError(t, rows.Err(), "completion of result loop scan")
- require.ElementsMatch(t, expected, actual)
+
+ return entries
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index d6354161c..a4d2a54da 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -396,57 +396,17 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
var totalEvents int
shard, err := r.nodeManager.GetShard(virtualStorage)
- if err == nil {
+ if err != nil {
+ logger.WithError(err).Error("error when getting primary and secondaries")
+ } else {
targetNodes := shard.Secondaries
if shard.IsReadOnly {
targetNodes = append(targetNodes, shard.Primary)
}
for _, target := range targetNodes {
- events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
- if err != nil {
- logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
- continue
- }
-
- totalEvents += len(events)
-
- eventIDsByState := map[datastore.JobState][]uint64{}
- for _, event := range events {
- if err := r.processReplicationEvent(ctx, event, shard, target.GetConnection()); err != nil {
- logger.WithFields(logrus.Fields{
- logWithReplJobID: event.ID,
- logWithReplVirtual: event.Job.VirtualStorage,
- logWithReplTarget: event.Job.TargetNodeStorage,
- logWithReplSource: event.Job.SourceNodeStorage,
- logWithReplChange: event.Job.Change,
- logWithReplPath: event.Job.RelativePath,
- logWithCorrID: getCorrelationID(event.Meta),
- }).WithError(err).Error("replication job failed")
- if event.Attempt <= 0 {
- eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID)
- } else {
- eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID)
- }
- continue
- }
- eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID)
- }
- for state, eventIDs := range eventIDsByState {
- ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
- if err != nil {
- logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
- continue
- }
-
- notAckIDs := subtractUint64(ackIDs, eventIDs)
- if len(notAckIDs) > 0 {
- logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
- }
- }
+ totalEvents += r.handleNode(ctx, logger, shard, virtualStorage, target)
}
- } else {
- logger.WithError(err).Error("error when getting primary and secondaries")
}
if totalEvents == 0 {
@@ -463,6 +423,79 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora
}
}
+func (r ReplMgr) handleNode(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, virtualStorage string, target nodes.Node) int {
+ events, err := r.queue.Dequeue(ctx, virtualStorage, target.GetStorage(), 10)
+ if err != nil {
+ logger.WithField(logWithReplTarget, target.GetStorage()).WithError(err).Error("failed to dequeue replication events")
+ return 0
+ }
+
+ if len(events) == 0 {
+ return 0
+ }
+
+ stopHealthUpdate := r.startHealthUpdate(ctx, logger, events)
+ defer stopHealthUpdate()
+
+ eventIDsByState := map[datastore.JobState][]uint64{}
+ for _, event := range events {
+ state := r.handleNodeEvent(ctx, logger, shard, target, event)
+ eventIDsByState[state] = append(eventIDsByState[state], event.ID)
+ }
+
+ for state, eventIDs := range eventIDsByState {
+ ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
+ if err != nil {
+ logger.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events")
+ continue
+ }
+
+ notAckIDs := subtractUint64(ackIDs, eventIDs)
+ if len(notAckIDs) > 0 {
+ logger.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged")
+ }
+ }
+
+ return len(events)
+}
+
+func (r ReplMgr) startHealthUpdate(ctx context.Context, logger logrus.FieldLogger, events []datastore.ReplicationEvent) context.CancelFunc {
+ healthUpdateCtx, healthUpdateCancel := context.WithCancel(ctx)
+ go func() {
+ ticker := time.NewTicker(5 * time.Second)
+ defer ticker.Stop()
+
+ if err := r.queue.StartHealthUpdate(healthUpdateCtx, ticker.C, events); err != nil {
+ ids := make([]uint64, len(events))
+ for i, event := range events {
+ ids[i] = event.ID
+ }
+
+ logger.WithField("event_ids", ids).WithError(err).Error("health update loop")
+ }
+ }()
+
+ return healthUpdateCancel
+}
+
+func (r ReplMgr) handleNodeEvent(ctx context.Context, logger logrus.FieldLogger, shard nodes.Shard, target nodes.Node, event datastore.ReplicationEvent) datastore.JobState {
+ err := r.processReplicationEvent(ctx, event, shard, target.GetConnection())
+ if err == nil {
+ return datastore.JobStateCompleted
+ }
+
+ logger.WithFields(logrus.Fields{
+ logWithReplJobID: event.ID,
+ logWithCorrID: getCorrelationID(event.Meta),
+ }).WithError(err).Error("replication job failed")
+
+ if event.Attempt <= 0 {
+ return datastore.JobStateDead
+ }
+
+ return datastore.JobStateFailed
+}
+
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, shard nodes.Shard, targetCC *grpc.ClientConn) error {
source, err := shard.GetNode(event.Job.SourceNodeStorage)
if err != nil {
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 2ae4dd1dc..93954bd3a 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -7,6 +7,7 @@ import (
"net"
"os"
"path/filepath"
+ "sync/atomic"
"testing"
"time"
@@ -572,6 +573,13 @@ func TestProcessBacklog_Success(t *testing.T) {
return ackIDs, err
})
+ var healthUpdated int32
+ queueInterceptor.OnStartHealthUpdate(func(ctx context.Context, trigger <-chan time.Time, events []datastore.ReplicationEvent) error {
+ require.Len(t, events, 4)
+ atomic.AddInt32(&healthUpdated, 1)
+ return nil
+ })
+
// Update replication job
eventType1 := datastore.ReplicationEvent{
Job: datastore.ReplicationJob{
@@ -636,6 +644,7 @@ func TestProcessBacklog_Success(t *testing.T) {
select {
case <-processed:
+ require.EqualValues(t, 1, atomic.LoadInt32(&healthUpdated), "health update should be called")
case <-time.After(30 * time.Second):
// strongly depends on the processing capacity
t.Fatal("time limit expired for job to complete")