Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2022-04-21 17:01:54 +0300
committerJohn Cai <jcai@gitlab.com>2022-04-21 17:01:54 +0300
commit1b1b95408d11a2532db5a44ffefd5cbab6e0effd (patch)
tree2eafcc63858de8c32cdbf5d53ee44f26908b5385
parentb95a243ee35835e5f904e8a42bbccb758d904b07 (diff)
parente3ec6e2922cea629ed900aeb9a868a19dfe89c65 (diff)
Merge branch 'smh-release-stale-leases' into 'master'
Release expired verification leases periodically See merge request gitlab-org/gitaly!4478
-rw-r--r--_support/praefect-schema.sql7
-rw-r--r--cmd/praefect/main.go29
-rw-r--r--internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go16
-rw-r--r--internal/praefect/verifier.go82
-rw-r--r--internal/praefect/verifier_test.go84
5 files changed, 207 insertions, 11 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql
index 78316e80c..813d4fa89 100644
--- a/_support/praefect-schema.sql
+++ b/_support/praefect-schema.sql
@@ -558,6 +558,13 @@ CREATE UNIQUE INDEX storage_repositories_new_pkey ON public.storage_repositories
--
+-- Name: verification_leases; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX verification_leases ON public.storage_repositories USING btree (verification_leased_until) WHERE (verification_leased_until IS NOT NULL);
+
+
+--
-- Name: verification_queue; Type: INDEX; Schema: public; Owner: -
--
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 20f7b841f..d33183438 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -350,23 +350,30 @@ func run(
conf.DefaultReplicationFactors(),
)
- go func() {
- if conf.BackgroundVerification.VerificationInterval <= 0 {
- logger.Info("background verifier is disabled")
- return
- }
-
+ if conf.BackgroundVerification.VerificationInterval > 0 {
logger.WithField("config", conf.BackgroundVerification).Info("background verifier started")
- if err := praefect.NewMetadataVerifier(
+ verifier := praefect.NewMetadataVerifier(
logger,
db,
nodeSet.Connections(),
hm,
conf.BackgroundVerification.VerificationInterval,
- ).Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
- logger.WithError(err).Error("metadata verifier finished")
- }
- }()
+ )
+
+ go func() {
+ if err := verifier.Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
+ logger.WithError(err).Error("metadata verifier finished")
+ }
+ }()
+
+ go func() {
+ if err := verifier.RunExpiredLeaseReleaser(ctx, helper.NewTimerTicker(10*time.Second)); err != nil {
+ logger.WithError(err).Error("expired verification lease releaser finished")
+ }
+ }()
+ } else {
+ logger.Info("background verifier is disabled")
+ }
} else {
if conf.Failover.Enabled {
logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn(
diff --git a/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go b/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go
new file mode 100644
index 000000000..2fe967672
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20220404131105_stale_verification_lease_index.go
@@ -0,0 +1,16 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20220404131105_stale_verification_lease_index",
+ Up: []string{`
+ CREATE INDEX verification_leases ON storage_repositories (verification_leased_until)
+ WHERE verification_leased_until IS NOT NULL
+ `},
+ Down: []string{"DROP INDEX verification_leases"},
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/verifier.go b/internal/praefect/verifier.go
index 543d0d4c2..449e379ea 100644
--- a/internal/praefect/verifier.go
+++ b/internal/praefect/verifier.go
@@ -80,6 +80,88 @@ func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error
}
}
+// RunExpiredLeaseReleaser releases expired leases on every tick. It keeps running until the context is
+// canceled.
+func (v *MetadataVerifier) RunExpiredLeaseReleaser(ctx context.Context, ticker helper.Ticker) error {
+ defer ticker.Stop()
+
+ for {
+ ticker.Reset()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C():
+ if err := v.releaseExpiredLeases(ctx); err != nil {
+ v.log.WithError(err).Error("failed releasing expired leases")
+ }
+ }
+ }
+}
+
+func (v *MetadataVerifier) releaseExpiredLeases(ctx context.Context) error {
+ // The update is batched as there could potentially be a lot of stale leases. A long
+ // transaction could block other operational queries such as generation increments on
+ // write acknowledgement.
+ for {
+ rows, err := v.db.QueryContext(ctx, `
+ WITH to_release AS (
+ SELECT repository_id, repositories.virtual_storage, repositories.relative_path, storage
+ FROM storage_repositories
+ JOIN repositories USING (repository_id)
+ WHERE verification_leased_until < now() - $1 * interval '1 microsecond'
+ LIMIT $2
+ FOR NO KEY UPDATE SKIP LOCKED
+ ), release AS (
+ UPDATE storage_repositories
+ SET verification_leased_until = NULL
+ FROM to_release
+ WHERE storage_repositories.repository_id = to_release.repository_id
+ AND storage_repositories.storage = to_release.storage
+ )
+
+ SELECT virtual_storage, relative_path, storage
+ FROM to_release
+ `, v.leaseDuration.Microseconds(), v.batchSize)
+ if err != nil {
+ return fmt.Errorf("query execution: %w", err)
+ }
+ defer rows.Close()
+
+ released := map[string]map[string][]string{}
+ totalReleased := 0
+ for rows.Next() {
+ totalReleased++
+
+ var virtualStorage, relativePath, storage string
+ if err := rows.Scan(&virtualStorage, &relativePath, &storage); err != nil {
+ return fmt.Errorf("scan: %w", err)
+ }
+
+ if released[virtualStorage] == nil {
+ released[virtualStorage] = make(map[string][]string)
+ }
+
+ released[virtualStorage][relativePath] = append(released[virtualStorage][relativePath], storage)
+ }
+
+ if err := rows.Err(); err != nil {
+ return fmt.Errorf("rows: %w", err)
+ }
+
+ if totalReleased > 0 {
+ v.log.WithField("leases_released", released).Info("released stale verification leases")
+ }
+
+ // If fewer leases than the batch size were released, there's no more work for us
+ // and we can wait until the next tick. There could still be some given the query
+ // skips locked rows but these can be handled on the next run.
+ if totalReleased < v.batchSize {
+ return nil
+ }
+ }
+}
+
func (v *MetadataVerifier) run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, v.leaseDuration)
defer cancel()
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
index f6d1f8187..bb2a099b9 100644
--- a/internal/praefect/verifier_test.go
+++ b/internal/praefect/verifier_test.go
@@ -8,6 +8,7 @@ import (
"testing"
"time"
+ "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
@@ -589,3 +590,86 @@ func getAllReplicas(ctx context.Context, t testing.TB, db glsql.Querier) map[str
return results
}
+
+func TestVerifier_runExpiredLeaseReleaser(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ db := testdb.New(t)
+
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
+ require.NoError(t,
+ rs.CreateRepository(ctx, 1, "virtual-storage-1", "relative-path-1", "replica-path-1", "no-lease", []string{
+ "valid-lease", "expired-lease-1", "expired-lease-2", "expired-lease-3", "locked-lease",
+ }, nil, true, true),
+ )
+
+ // lock one lease record to ensure the worker doesn't block on it
+ result, err := db.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verification_leased_until = now() - interval '1 week'
+ WHERE storage = 'locked-lease'
+ `)
+ require.NoError(t, err)
+ locked, err := result.RowsAffected()
+ require.NoError(t, err)
+ require.Equal(t, int64(1), locked)
+
+ lockingTx := db.Begin(t)
+ defer lockingTx.Rollback(t)
+ _, err = lockingTx.ExecContext(ctx, "SELECT FROM storage_repositories WHERE storage = 'locked-lease' FOR UPDATE")
+ require.NoError(t, err)
+
+ tx := db.Begin(t)
+ defer tx.Rollback(t)
+
+ logger, hook := test.NewNullLogger()
+ verifier := NewMetadataVerifier(logrus.NewEntry(logger), tx, nil, nil, 0)
+ // set batch size lower than the number of locked leases to ensure the batching works
+ verifier.batchSize = 2
+
+ _, err = tx.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verification_leased_until = lease_time
+ FROM ( VALUES
+ ( 'valid-lease', now() - $1 * interval '1 microsecond' ),
+ ( 'expired-lease-1', now() - $1 * interval '1 microsecond' - interval '1 microsecond'),
+ ( 'expired-lease-2', now() - $1 * interval '2 microsecond'),
+ ( 'expired-lease-3', now() - $1 * interval '2 microsecond')
+ ) AS fixture (storage, lease_time)
+ WHERE storage_repositories.storage = fixture.storage
+ `, verifier.leaseDuration.Microseconds())
+ require.NoError(t, err)
+
+ runCtx, cancelRun := context.WithCancel(ctx)
+ ticker := helper.NewCountTicker(1, cancelRun)
+
+ err = verifier.RunExpiredLeaseReleaser(runCtx, ticker)
+ require.Equal(t, context.Canceled, err)
+
+ // actualReleased contains the released leases from the logs. It's keyed
+ // as virtual storage -> relative path -> storage.
+ actualReleased := map[string]map[string]map[string]struct{}{}
+ require.Equal(t, 2, len(hook.AllEntries()))
+ for i := range hook.Entries {
+ require.Equal(t, "released stale verification leases", hook.Entries[i].Message, hook.Entries[i].Data[logrus.ErrorKey])
+ for virtualStorage, relativePaths := range hook.Entries[i].Data["leases_released"].(map[string]map[string][]string) {
+ for relativePath, storages := range relativePaths {
+ for _, storage := range storages {
+ if actualReleased[virtualStorage] == nil {
+ actualReleased[virtualStorage] = map[string]map[string]struct{}{}
+ }
+
+ if actualReleased[virtualStorage][relativePath] == nil {
+ actualReleased[virtualStorage][relativePath] = map[string]struct{}{}
+ }
+
+ actualReleased[virtualStorage][relativePath][storage] = struct{}{}
+ }
+ }
+ }
+ }
+
+ require.Equal(t, map[string]map[string]map[string]struct{}{
+ "virtual-storage-1": {"relative-path-1": {"expired-lease-1": {}, "expired-lease-2": {}, "expired-lease-3": {}}},
+ }, actualReleased)
+}