diff options
author | John Cai <jcai@gitlab.com> | 2022-04-21 17:01:54 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2022-04-21 17:01:54 +0300 |
commit | 1b1b95408d11a2532db5a44ffefd5cbab6e0effd (patch) | |
tree | 2eafcc63858de8c32cdbf5d53ee44f26908b5385 | |
parent | b95a243ee35835e5f904e8a42bbccb758d904b07 (diff) | |
parent | e3ec6e2922cea629ed900aeb9a868a19dfe89c65 (diff) |
Merge branch 'smh-release-stale-leases' into 'master'
Release expired verification leases periodically
See merge request gitlab-org/gitaly!4478
-rw-r--r-- | _support/praefect-schema.sql | 7 | ||||
-rw-r--r-- | cmd/praefect/main.go | 29 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go | 16 | ||||
-rw-r--r-- | internal/praefect/verifier.go | 82 | ||||
-rw-r--r-- | internal/praefect/verifier_test.go | 84 |
5 files changed, 207 insertions, 11 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql index 78316e80c..813d4fa89 100644 --- a/_support/praefect-schema.sql +++ b/_support/praefect-schema.sql @@ -558,6 +558,13 @@ CREATE UNIQUE INDEX storage_repositories_new_pkey ON public.storage_repositories -- +-- Name: verification_leases; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX verification_leases ON public.storage_repositories USING btree (verification_leased_until) WHERE (verification_leased_until IS NOT NULL); + + +-- -- Name: verification_queue; Type: INDEX; Schema: public; Owner: - -- diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 20f7b841f..d33183438 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -350,23 +350,30 @@ func run( conf.DefaultReplicationFactors(), ) - go func() { - if conf.BackgroundVerification.VerificationInterval <= 0 { - logger.Info("background verifier is disabled") - return - } - + if conf.BackgroundVerification.VerificationInterval > 0 { logger.WithField("config", conf.BackgroundVerification).Info("background verifier started") - if err := praefect.NewMetadataVerifier( + verifier := praefect.NewMetadataVerifier( logger, db, nodeSet.Connections(), hm, conf.BackgroundVerification.VerificationInterval, - ).Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil { - logger.WithError(err).Error("metadata verifier finished") - } - }() + ) + + go func() { + if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil { + logger.WithError(err).Error("metadata verifier finished") + } + }() + + go func() { + if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil { + logger.WithError(err).Error("expired verification lease releaser finished") + } + }() + } else { + logger.Info("background verifier is disabled") + } } else { if conf.Failover.Enabled { logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( diff --git a/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go b/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go new file mode 100644 index 000000000..2fe967672 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go @@ -0,0 +1,16 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220404131105_stale_verification_lease_index", + Up: []string{` + CREATE INDEX verification_leases ON storage_repositories (verification_leased_until) + WHERE verification_leased_until IS NOT NULL + `}, + Down: []string{"DROP INDEX verification_leases"}, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/verifier.go b/internal/praefect/verifier.go index 543d0d4c2..449e379ea 100644 --- a/internal/praefect/verifier.go +++ b/internal/praefect/verifier.go @@ -80,6 +80,88 @@ func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error } } +// RunExpiredLeaseReleaser releases expired leases on every tick. It keeps running until the context is +// canceled. +func (v *MetadataVerifier) RunExpiredLeaseReleaser(ctx context.Context, ticker helper.Ticker) error { + defer ticker.Stop() + + for { + ticker.Reset() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C(): + if err := v.releaseExpiredLeases(ctx); err != nil { + v.log.WithError(err).Error("failed releasing expired leases") + } + } + } +} + +func (v *MetadataVerifier) releaseExpiredLeases(ctx context.Context) error { + // The update is batched as there could potentially be a lot of stale leases. A long + // transaction could block other operational queries such as generation increments on + // write acknowledgement. + for { + rows, err := v.db.QueryContext(ctx, ` + WITH to_release AS ( + SELECT repository_id, repositories.virtual_storage, repositories.relative_path, storage + FROM storage_repositories + JOIN repositories USING (repository_id) + WHERE verification_leased_until < now() - $1 * interval '1 microsecond' + LIMIT $2 + FOR NO KEY UPDATE SKIP LOCKED + ), release AS ( + UPDATE storage_repositories + SET verification_leased_until = NULL + FROM to_release + WHERE storage_repositories.repository_id = to_release.repository_id + AND storage_repositories.storage = to_release.storage + ) + + SELECT virtual_storage, relative_path, storage + FROM to_release + `, v.leaseDuration.Microseconds(), v.batchSize) + if err != nil { + return fmt.Errorf("query execution: %w", err) + } + defer rows.Close() + + released := map[string]map[string][]string{} + totalReleased := 0 + for rows.Next() { + totalReleased++ + + var virtualStorage, relativePath, storage string + if err := rows.Scan(&virtualStorage, &relativePath, &storage); err != nil { + return fmt.Errorf("scan: %w", err) + } + + if released[virtualStorage] == nil { + released[virtualStorage] = make(map[string][]string) + } + + released[virtualStorage][relativePath] = append(released[virtualStorage][relativePath], storage) + } + + if err := rows.Err(); err != nil { + return fmt.Errorf("rows: %w", err) + } + + if totalReleased > 0 { + v.log.WithField("leases_released", released).Info("released stale verification leases") + } + + // If fewer leases than the batch size were released, there's no more work for us + // and we can wait until the next tick. There could still be some given the query + // skips locked rows but these can be handled on the next run. + if totalReleased < v.batchSize { + return nil + } + } +} + func (v *MetadataVerifier) run(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, v.leaseDuration) defer cancel() diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index f6d1f8187..bb2a099b9 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" @@ -589,3 +590,86 @@ func getAllReplicas(ctx context.Context, t testing.TB, db glsql.Querier) map[str return results } + +func TestVerifier_runExpiredLeaseReleaser(t *testing.T) { + ctx := testhelper.Context(t) + + db := testdb.New(t) + + rs := datastore.NewPostgresRepositoryStore(db, nil) + require.NoError(t, + rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "replica-path-1", "no-lease", []string{ + "valid-lease", "expired-lease-1", "expired-lease-2", "expired-lease-3", "locked-lease", + }, nil, true, true), + ) + + // lock one lease record to ensure the worker doesn't block on it + result, err := db.ExecContext(ctx, ` + UPDATE storage_repositories + SET verification_leased_until = now() - interval '1 week' + WHERE storage = 'locked-lease' + `) + require.NoError(t, err) + locked, err := result.RowsAffected() + require.NoError(t, err) + require.Equal(t, int64(1), locked) + + lockingTx := db.Begin(t) + defer lockingTx.Rollback(t) + _, err = lockingTx.ExecContext(ctx, "SELECT FROM storage_repositories WHERE storage = 'locked-lease' FOR UPDATE") + require.NoError(t, err) + + tx := db.Begin(t) + defer tx.Rollback(t) + + logger, hook := test.NewNullLogger() + verifier := NewMetadataVerifier(logrus.NewEntry(logger), tx, nil, nil, 0) + // set batch size lower than the number of locked leases to ensure the batching works + verifier.batchSize = 2 + + _, err = tx.ExecContext(ctx, ` + UPDATE storage_repositories + SET verification_leased_until = lease_time + FROM ( VALUES + ( 'valid-lease', now() - $1 * interval '1 microsecond' ), + ( 'expired-lease-1', now() - $1 * interval '1 microsecond' - interval '1 microsecond'), + ( 'expired-lease-2', now() - $1 * interval '2 microsecond'), + ( 'expired-lease-3', now() - $1 * interval '2 microsecond') + ) AS fixture (storage, lease_time) + WHERE storage_repositories.storage = fixture.storage + `, verifier.leaseDuration.Microseconds()) + require.NoError(t, err) + + runCtx, cancelRun := context.WithCancel(ctx) + ticker := helper.NewCountTicker(1, cancelRun) + + err = verifier.RunExpiredLeaseReleaser(runCtx, ticker) + require.Equal(t, context.Canceled, err) + + // actualReleased contains the released leases from the logs. It's keyed + // as virtual storage -> relative path -> storage. + actualReleased := map[string]map[string]map[string]struct{}{} + require.Equal(t, 2, len(hook.AllEntries())) + for i := range hook.Entries { + require.Equal(t, "released stale verification leases", hook.Entries[i].Message, hook.Entries[i].Data[logrus.ErrorKey]) + for virtualStorage, relativePaths := range hook.Entries[i].Data["leases_released"].(map[string]map[string][]string) { + for relativePath, storages := range relativePaths { + for _, storage := range storages { + if actualReleased[virtualStorage] == nil { + actualReleased[virtualStorage] = map[string]map[string]struct{}{} + } + + if actualReleased[virtualStorage][relativePath] == nil { + actualReleased[virtualStorage][relativePath] = map[string]struct{}{} + } + + actualReleased[virtualStorage][relativePath][storage] = struct{}{} + } + } + } + } + + require.Equal(t, map[string]map[string]map[string]struct{}{ + "virtual-storage-1": {"relative-path-1": {"expired-lease-1": {}, "expired-lease-2": {}, "expired-lease-3": {}}}, + }, actualReleased) +} |