Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-04-04 15:23:47 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-20 13:37:25 +0300
commite3ec6e2922cea629ed900aeb9a868a19dfe89c65 (patch)
tree8a4dcf3f4b60526e328d6c5116193f898be579d1
parent9413ca591ebe30dcb133c86d0ec53f6bc2fc30bb (diff)
Release expired verification leases periodically
The background verifier sets a lease time on a replica when it picks it up for verification. If the worker dies for some reason, the lease will remain in place and no other worker will pick up the replica for verification again until the lease is cleared. The lease itself tells the maximum time the worker itself would be working on the replica. After it has been passed, it would be safe for another worker to pick up the replica for verification again. This commit adds a background goroutine that periodically releases expired leases so other workers can take up the work if the original worker failed and did not release the lease. The 'verificaton_leases' index is added so the query can efficiently find the replicas with leases acquired to find the stale ones.
-rw-r--r--_support/praefect-schema.sql7
-rw-r--r--cmd/praefect/main.go29
-rw-r--r--internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go16
-rw-r--r--internal/praefect/verifier.go82
-rw-r--r--internal/praefect/verifier_test.go84
5 files changed, 207 insertions, 11 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql
index 78316e80c..813d4fa89 100644
--- a/_support/praefect-schema.sql
+++ b/_support/praefect-schema.sql
@@ -558,6 +558,13 @@ CREATE UNIQUE INDEX storage_repositories_new_pkey ON public.storage_repositories
--
+-- Name: verification_leases; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX verification_leases ON public.storage_repositories USING btree (verification_leased_until) WHERE (verification_leased_until IS NOT NULL);
+
+
+--
-- Name: verification_queue; Type: INDEX; Schema: public; Owner: -
--
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 20f7b841f..d33183438 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -350,23 +350,30 @@ func run(
conf.DefaultReplicationFactors(),
)
- go func() {
- if conf.BackgroundVerification.VerificationInterval <= 0 {
- logger.Info("background verifier is disabled")
- return
- }
-
+ if conf.BackgroundVerification.VerificationInterval > 0 {
logger.WithField("config", conf.BackgroundVerification).Info("background verifier started")
- if err := praefect.NewMetadataVerifier(
+ verifier := praefect.NewMetadataVerifier(
logger,
db,
nodeSet.Connections(),
hm,
conf.BackgroundVerification.VerificationInterval,
- ).Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
- logger.WithError(err).Error("metadata verifier finished")
- }
- }()
+ )
+
+ go func() {
+ if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
+ logger.WithError(err).Error("metadata verifier finished")
+ }
+ }()
+
+ go func() {
+ if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil {
+ logger.WithError(err).Error("expired verification lease releaser finished")
+ }
+ }()
+ } else {
+ logger.Info("background verifier is disabled")
+ }
} else {
if conf.Failover.Enabled {
logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn(
diff --git a/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go b/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go
new file mode 100644
index 000000000..2fe967672
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go
@@ -0,0 +1,16 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20220404131105_stale_verification_lease_index",
+ Up: []string{`
+ CREATE INDEX verification_leases ON storage_repositories (verification_leased_until)
+ WHERE verification_leased_until IS NOT NULL
+ `},
+ Down: []string{"DROP INDEX verification_leases"},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/verifier.go b/internal/praefect/verifier.go
index 543d0d4c2..449e379ea 100644
--- a/internal/praefect/verifier.go
+++ b/internal/praefect/verifier.go
@@ -80,6 +80,88 @@ func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error
}
}
+// RunExpiredLeaseReleaser releases expired leases on every tick. It keeps running until the context is
+// canceled.
+func (v *MetadataVerifier) RunExpiredLeaseReleaser(ctx context.Context, ticker helper.Ticker) error {
+ defer ticker.Stop()
+
+ for {
+ ticker.Reset()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C():
+ if err := v.releaseExpiredLeases(ctx); err != nil {
+ v.log.WithError(err).Error("failed releasing expired leases")
+ }
+ }
+ }
+}
+
+func (v *MetadataVerifier) releaseExpiredLeases(ctx context.Context) error {
+ // The update is batched as there could potentially be a lot of stale leases. A long
+ // transaction could block other operational queries such as generation increments on
+ // write acknowledgement.
+ for {
+ rows, err := v.db.QueryContext(ctx, `
+ WITH to_release AS (
+ SELECT repository_id, repositories.virtual_storage, repositories.relative_path, storage
+ FROM storage_repositories
+ JOIN repositories USING (repository_id)
+ WHERE verification_leased_until < now() - $1 * interval '1 microsecond'
+ LIMIT $2
+ FOR NO KEY UPDATE SKIP LOCKED
+ ), release AS (
+ UPDATE storage_repositories
+ SET verification_leased_until = NULL
+ FROM to_release
+ WHERE storage_repositories.repository_id = to_release.repository_id
+ AND storage_repositories.storage = to_release.storage
+ )
+
+ SELECT virtual_storage, relative_path, storage
+ FROM to_release
+ `, v.leaseDuration.Microseconds(), v.batchSize)
+ if err != nil {
+ return fmt.Errorf("query execution: %w", err)
+ }
+ defer rows.Close()
+
+ released := map[string]map[string][]string{}
+ totalReleased := 0
+ for rows.Next() {
+ totalReleased++
+
+ var virtualStorage, relativePath, storage string
+ if err := rows.Scan(&virtualStorage, &relativePath, &storage); err != nil {
+ return fmt.Errorf("scan: %w", err)
+ }
+
+ if released[virtualStorage] == nil {
+ released[virtualStorage] = make(map[string][]string)
+ }
+
+ released[virtualStorage][relativePath] = append(released[virtualStorage][relativePath], storage)
+ }
+
+ if err := rows.Err(); err != nil {
+ return fmt.Errorf("rows: %w", err)
+ }
+
+ if totalReleased > 0 {
+ v.log.WithField("leases_released", released).Info("released stale verification leases")
+ }
+
+ // If fewer leases than the batch size were released, there's no more work for us
+ // and we can wait until the next tick. There could still be some given the query
+ // skips locked rows but these can be handled on the next run.
+ if totalReleased < v.batchSize {
+ return nil
+ }
+ }
+}
+
func (v *MetadataVerifier) run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, v.leaseDuration)
defer cancel()
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
index f6d1f8187..bb2a099b9 100644
--- a/internal/praefect/verifier_test.go
+++ b/internal/praefect/verifier_test.go
@@ -8,6 +8,7 @@ import (
"testing"
"time"
+ "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
@@ -589,3 +590,86 @@ func getAllReplicas(ctx context.Context, t testing.TB, db glsql.Querier) map[str
return results
}
+
+func TestVerifier_runExpiredLeaseReleaser(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ db := testdb.New(t)
+
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
+ require.NoError(t,
+ rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "replica-path-1", "no-lease", []string{
+ "valid-lease", "expired-lease-1", "expired-lease-2", "expired-lease-3", "locked-lease",
+ }, nil, true, true),
+ )
+
+ // lock one lease record to ensure the worker doesn't block on it
+ result, err := db.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verification_leased_until = now() - interval '1 week'
+ WHERE storage = 'locked-lease'
+ `)
+ require.NoError(t, err)
+ locked, err := result.RowsAffected()
+ require.NoError(t, err)
+ require.Equal(t, int64(1), locked)
+
+ lockingTx := db.Begin(t)
+ defer lockingTx.Rollback(t)
+ _, err = lockingTx.ExecContext(ctx, "SELECT FROM storage_repositories WHERE storage = 'locked-lease' FOR UPDATE")
+ require.NoError(t, err)
+
+ tx := db.Begin(t)
+ defer tx.Rollback(t)
+
+ logger, hook := test.NewNullLogger()
+ verifier := NewMetadataVerifier(logrus.NewEntry(logger), tx, nil, nil, 0)
+ // set batch size lower than the number of locked leases to ensure the batching works
+ verifier.batchSize = 2
+
+ _, err = tx.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verification_leased_until = lease_time
+ FROM ( VALUES
+ ( 'valid-lease', now() - $1 * interval '1 microsecond' ),
+ ( 'expired-lease-1', now() - $1 * interval '1 microsecond' - interval '1 microsecond'),
+ ( 'expired-lease-2', now() - $1 * interval '2 microsecond'),
+ ( 'expired-lease-3', now() - $1 * interval '2 microsecond')
+ ) AS fixture (storage, lease_time)
+ WHERE storage_repositories.storage = fixture.storage
+ `, verifier.leaseDuration.Microseconds())
+ require.NoError(t, err)
+
+ runCtx, cancelRun := context.WithCancel(ctx)
+ ticker := helper.NewCountTicker(1, cancelRun)
+
+ err = verifier.RunExpiredLeaseReleaser(runCtx, ticker)
+ require.Equal(t, context.Canceled, err)
+
+ // actualReleased contains the released leases from the logs. It's keyed
+ // as virtual storage -> relative path -> storage.
+ actualReleased := map[string]map[string]map[string]struct{}{}
+ require.Equal(t, 2, len(hook.AllEntries()))
+ for i := range hook.Entries {
+ require.Equal(t, "released stale verification leases", hook.Entries[i].Message, hook.Entries[i].Data[logrus.ErrorKey])
+ for virtualStorage, relativePaths := range hook.Entries[i].Data["leases_released"].(map[string]map[string][]string) {
+ for relativePath, storages := range relativePaths {
+ for _, storage := range storages {
+ if actualReleased[virtualStorage] == nil {
+ actualReleased[virtualStorage] = map[string]map[string]struct{}{}
+ }
+
+ if actualReleased[virtualStorage][relativePath] == nil {
+ actualReleased[virtualStorage][relativePath] = map[string]struct{}{}
+ }
+
+ actualReleased[virtualStorage][relativePath][storage] = struct{}{}
+ }
+ }
+ }
+ }
+
+ require.Equal(t, map[string]map[string]map[string]struct{}{
+ "virtual-storage-1": {"relative-path-1": {"expired-lease-1": {}, "expired-lease-2": {}, "expired-lease-3": {}}},
+ }, actualReleased)
+}