diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-17 15:41:04 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2021-09-22 15:32:42 +0300 |
commit | 165c10e958ff69198a6f569d696bf31d22c1d1cc (patch) | |
tree | 09262862b1475af38f081b6717f965b0a22cfdd8 | |
parent | 641cb8f9b6f86f4b51ce4316ce47655ef2213215 (diff) |
replication: Graceful stop of the replication processing loop
The loop for processing replication events uses a root context
of the application. Once the context is cancelled in-flight
operation would be cancelled as well (network call, db operation,
etc.). Instead we should allow operation to complete and
stop processing events on the next iteration. It makes execution
of the replication events more stable and deterministic.
Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2703
Changelog: Fixed
-rw-r--r-- | internal/praefect/replicator.go | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 8d3cbdf4f..b770cfef7 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" @@ -569,10 +570,14 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora logger.Info("processing started") + // We should make a graceful shutdown of the processing loop and don't want to interrupt + // in-flight operations. That is why we suppress cancellation on the provided context. + appCtx := ctx + ctx = helper.SuppressCancellation(ctx) for { select { - case <-ctx.Done(): - logger.WithError(ctx.Err()).Info("processing stopped") + case <-appCtx.Done(): + logger.WithError(appCtx.Err()).Info("processing stopped") return // processing must be stopped default: // proceed with processing @@ -593,8 +598,8 @@ func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFunc, virtualStora select { case <-time.After(backoff()): continue - case <-ctx.Done(): - logger.WithError(ctx.Err()).Info("processing stopped") + case <-appCtx.Done(): + logger.WithError(appCtx.Err()).Info("processing stopped") return } } |