Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-04-13 12:41:00 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-13 12:41:00 +0300
commit9413ca591ebe30dcb133c86d0ec53f6bc2fc30bb (patch)
treeb3cbb9214adbd7618ad44916fc004a70c1aaf280
parent93153d53f1c77a28ef76ae9c5777ed5477835962 (diff)
parent85ace7cf4f63ab8d99372c74f4e7bcb09a2ac219 (diff)
Merge branch 'smh-background-verifier' into 'master'
Initial implementation of a metadata verifier See merge request gitlab-org/gitaly!4459
-rw-r--r--_support/praefect-schema.sql11
-rw-r--r--cmd/praefect/main.go18
-rw-r--r--internal/praefect/config/config.go42
-rw-r--r--internal/praefect/config/config_test.go5
-rw-r--r--internal/praefect/config/testdata/config.toml3
-rw-r--r--internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go23
-rw-r--r--internal/praefect/verifier.go277
-rw-r--r--internal/praefect/verifier_test.go591
8 files changed, 955 insertions, 15 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql
index b69a2f0c6..78316e80c 100644
--- a/_support/praefect-schema.sql
+++ b/_support/praefect-schema.sql
@@ -266,7 +266,9 @@ CREATE TABLE public.storage_repositories (
relative_path text NOT NULL,
storage text NOT NULL,
generation bigint NOT NULL,
- repository_id bigint NOT NULL
+ repository_id bigint NOT NULL,
+ verified_at timestamp with time zone,
+ verification_leased_until timestamp with time zone
);
@@ -556,6 +558,13 @@ CREATE UNIQUE INDEX storage_repositories_new_pkey ON public.storage_repositories
--
+-- Name: verification_queue; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX verification_queue ON public.storage_repositories USING btree (verified_at NULLS FIRST) WHERE (verification_leased_until IS NULL);
+
+
+--
-- Name: virtual_target_on_replication_queue_idx; Type: INDEX; Schema: public; Owner: -
--
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 7f072b0f3..20f7b841f 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -349,6 +349,24 @@ func run(
rs,
conf.DefaultReplicationFactors(),
)
+
+ go func() {
+ if conf.BackgroundVerification.VerificationInterval <= 0 {
+ logger.Info("background verifier is disabled")
+ return
+ }
+
+ logger.WithField("config", conf.BackgroundVerification).Info("background verifier started")
+ if err := praefect.NewMetadataVerifier(
+ logger,
+ db,
+ nodeSet.Connections(),
+ hm,
+ conf.BackgroundVerification.VerificationInterval,
+ ).Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil {
+ logger.WithError(err).Error("metadata verifier finished")
+ }
+ }()
} else {
if conf.Failover.Enabled {
logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn(
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 286577d6a..7c9a2791c 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -78,6 +78,18 @@ func (f Failover) ErrorThresholdsConfigured() (bool, error) {
return true, nil
}
+// BackgroundVerification contains configuration options for the repository background verification.
+type BackgroundVerification struct {
+ // VerificationInterval determines the duration after a replica due for reverification.
+ // The feature is disabled if verification interval is 0 or below.
+ VerificationInterval time.Duration `toml:"verification_interval,omitempty"`
+}
+
+// DefaultBackgroundVerificationConfig returns the default background verification configuration.
+func DefaultBackgroundVerificationConfig() BackgroundVerification {
+ return BackgroundVerification{}
+}
+
// Reconciliation contains reconciliation specific configuration options.
type Reconciliation struct {
// SchedulingInterval the interval between each automatic reconciliation run. If set to 0,
@@ -112,17 +124,18 @@ func DefaultReplicationConfig() Replication {
// Config is a container for everything found in the TOML config file
type Config struct {
- AllowLegacyElectors bool `toml:"i_understand_my_election_strategy_is_unsupported_and_will_be_removed_without_warning,omitempty"`
- Reconciliation Reconciliation `toml:"reconciliation,omitempty"`
- Replication Replication `toml:"replication,omitempty"`
- ListenAddr string `toml:"listen_addr,omitempty"`
- TLSListenAddr string `toml:"tls_listen_addr,omitempty"`
- SocketPath string `toml:"socket_path,omitempty"`
- VirtualStorages []*VirtualStorage `toml:"virtual_storage,omitempty"`
- Logging log.Config `toml:"logging,omitempty"`
- Sentry sentry.Config `toml:"sentry,omitempty"`
- PrometheusListenAddr string `toml:"prometheus_listen_addr,omitempty"`
- Prometheus prometheus.Config `toml:"prometheus,omitempty"`
+ AllowLegacyElectors bool `toml:"i_understand_my_election_strategy_is_unsupported_and_will_be_removed_without_warning,omitempty"`
+ BackgroundVerification BackgroundVerification `toml:"background_verification,omitempty"`
+ Reconciliation Reconciliation `toml:"reconciliation,omitempty"`
+ Replication Replication `toml:"replication,omitempty"`
+ ListenAddr string `toml:"listen_addr,omitempty"`
+ TLSListenAddr string `toml:"tls_listen_addr,omitempty"`
+ SocketPath string `toml:"socket_path,omitempty"`
+ VirtualStorages []*VirtualStorage `toml:"virtual_storage,omitempty"`
+ Logging log.Config `toml:"logging,omitempty"`
+ Sentry sentry.Config `toml:"sentry,omitempty"`
+ PrometheusListenAddr string `toml:"prometheus_listen_addr,omitempty"`
+ Prometheus prometheus.Config `toml:"prometheus,omitempty"`
// PrometheusExcludeDatabaseFromDefaultMetrics excludes database-related metrics from the
// default metrics. If set to `false`, then database metrics will be available both via
// `/metrics` and `/db_metrics`. Otherwise, they will only be accessible via `/db_metrics`.
@@ -160,9 +173,10 @@ func FromFile(filePath string) (Config, error) {
}
conf := &Config{
- Reconciliation: DefaultReconciliationConfig(),
- Replication: DefaultReplicationConfig(),
- Prometheus: prometheus.DefaultConfig(),
+ BackgroundVerification: DefaultBackgroundVerificationConfig(),
+ Reconciliation: DefaultReconciliationConfig(),
+ Replication: DefaultReplicationConfig(),
+ Prometheus: prometheus.DefaultConfig(),
PrometheusExcludeDatabaseFromDefaultMetrics: true,
// Sets the default Failover, to be overwritten when deserializing the TOML
Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository},
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index d3c8e3cfb..aa0d0b3c5 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -335,6 +335,9 @@ func TestConfigParsing(t *testing.T) {
RunInterval: config.Duration(3 * time.Second),
RepositoriesInBatch: 10,
},
+ BackgroundVerification: BackgroundVerification{
+ VerificationInterval: 24 * time.Hour,
+ },
},
},
{
@@ -360,6 +363,7 @@ func TestConfigParsing(t *testing.T) {
RunInterval: config.Duration(4 * time.Second),
RepositoriesInBatch: 11,
},
+ BackgroundVerification: DefaultBackgroundVerificationConfig(),
},
},
{
@@ -382,6 +386,7 @@ func TestConfigParsing(t *testing.T) {
RunInterval: config.Duration(24 * time.Hour),
RepositoriesInBatch: 16,
},
+ BackgroundVerification: DefaultBackgroundVerificationConfig(),
},
},
{
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 839b807b0..3f2a370e2 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -6,6 +6,9 @@ prometheus_listen_addr = ""
memory_queue_enabled = true
graceful_stop_timeout = "30s"
+[background_verification]
+verification_interval = "24h"
+
[replication]
batch_size = 1
parallel_storage_processing_workers = 2
diff --git a/internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go b/internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go
new file mode 100644
index 000000000..578a86948
--- /dev/null
+++ b/internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go
@@ -0,0 +1,23 @@
+package migrations
+
+import migrate "github.com/rubenv/sql-migrate"
+
+func init() {
+ m := &migrate.Migration{
+ Id: "20220303105110_background_verification_columns",
+ Up: []string{
+ "ALTER TABLE storage_repositories ADD COLUMN verified_at TIMESTAMPTZ",
+ "ALTER TABLE storage_repositories ADD COLUMN verification_leased_until TIMESTAMPTZ",
+ `CREATE INDEX verification_queue ON storage_repositories ( verified_at NULLS FIRST )
+ WHERE verification_leased_until IS NULL
+ `,
+ },
+ Down: []string{
+ "DROP INDEX verification_queue",
+ "ALTER TABLE storage_repositories DROP COLUMN verification_leased_until",
+ "ALTER TABLE storage_repositories DROP COLUMN verified_at",
+ },
+ }
+
+ allMigrations = append(allMigrations, m)
+}
diff --git a/internal/praefect/verifier.go b/internal/praefect/verifier.go
new file mode 100644
index 000000000..543d0d4c2
--- /dev/null
+++ b/internal/praefect/verifier.go
@@ -0,0 +1,277 @@
+package praefect
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+// MetadataVerifier verifies the repository metadata against the actual replicas on the
+// Gitaly nodes. It queries the database for replicas that haven't been verified in a given
+// time and checks whether the Gitalys still have them. If a Gitaly doesn't have a replica,
+// the replica's metadata record is removed and the removal logged. The repository's record
+// is still left in place even if all of the replicas are lost to ensure the data loss doesn't
+// go unnoticed.
+type MetadataVerifier struct {
+ log logrus.FieldLogger
+ db glsql.Querier
+ conns Connections
+ batchSize int
+ leaseDuration time.Duration
+ healthChecker HealthChecker
+ verificationInterval time.Duration
+}
+
+// NewMetadataVerifier creates a new MetadataVerifier.
+func NewMetadataVerifier(
+ log logrus.FieldLogger,
+ db glsql.Querier,
+ conns Connections,
+ healthChecker HealthChecker,
+ verificationInterval time.Duration,
+) *MetadataVerifier {
+ return &MetadataVerifier{
+ log: log,
+ db: db,
+ conns: conns,
+ batchSize: 25,
+ leaseDuration: 30 * time.Second,
+ healthChecker: healthChecker,
+ verificationInterval: verificationInterval,
+ }
+}
+
+type verificationJob struct {
+ repositoryID int64
+ virtualStorage string
+ relativePath string
+ storage string
+ replicaPath string
+}
+
+type verificationResult struct {
+ job verificationJob
+ exists bool
+ error error
+}
+
+// Run runs the metadata verifier. It keeps running until the context is canceled.
+func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error {
+ defer ticker.Stop()
+
+ for {
+ ticker.Reset()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C():
+ if err := v.run(ctx); err != nil {
+ v.log.WithError(err).Error("failed a background verification run")
+ }
+ }
+ }
+}
+
+func (v *MetadataVerifier) run(ctx context.Context) error {
+ ctx, cancel := context.WithTimeout(ctx, v.leaseDuration)
+ defer cancel()
+
+ jobs, err := v.pickJobs(ctx)
+ if err != nil {
+ return fmt.Errorf("pick jobs: %w", err)
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(len(jobs))
+ results := make([]verificationResult, len(jobs))
+ for i, job := range jobs {
+ i, job := i, job
+ go func() {
+ defer wg.Done()
+
+ exists, err := v.verify(ctx, jobs[i])
+ results[i] = verificationResult{
+ job: job,
+ exists: exists,
+ error: err,
+ }
+ }()
+ }
+
+ wg.Wait()
+
+ return v.updateMetadata(ctx, results)
+}
+
+// logRecord is a helper type for gathering the removed replicas and logging them.
+type logRecord map[string]map[string][]string
+
+// markRemoved marks the given replica as removed.
+func (r logRecord) markRemoved(virtualStorage, relativePath, storage string) {
+ relativePaths, ok := r[virtualStorage]
+ if !ok {
+ relativePaths = map[string][]string{}
+ }
+
+ relativePaths[relativePath] = append(relativePaths[relativePath], storage)
+ r[virtualStorage] = relativePaths
+ sort.Strings(relativePaths[relativePath])
+}
+
+func (v *MetadataVerifier) updateMetadata(ctx context.Context, results []verificationResult) error {
+ repositoryIDs := make([]int64, len(results))
+ storages := make([]string, len(results))
+ successfullyVerifieds := make([]bool, len(results))
+ exists := make([]bool, len(results))
+
+ logRecords := logRecord{}
+ for i, result := range results {
+ repositoryIDs[i] = result.job.repositoryID
+ storages[i] = result.job.storage
+ exists[i] = result.exists
+ successfullyVerifieds[i] = result.error == nil
+
+ if result.error != nil {
+ v.log.WithFields(logrus.Fields{
+ "repository_id": result.job.repositoryID,
+ "replica_path": result.job.replicaPath,
+ "virtual_storage": result.job.virtualStorage,
+ "storage": result.job.storage,
+ "relative_path": result.job.relativePath,
+ logrus.ErrorKey: result.error,
+ }).Error("failed to verify replica's existence")
+ } else if !result.exists {
+ logRecords.markRemoved(result.job.virtualStorage, result.job.relativePath, result.job.storage)
+ }
+ }
+
+ if len(logRecords) > 0 {
+ v.log.WithField("replicas", logRecords).Info("removing metadata records of non-existent replicas")
+ }
+
+ _, err := v.db.ExecContext(ctx, `
+WITH results AS (
+ SELECT repository_id, storage, successfully_verified, exists
+ FROM (
+ SELECT unnest($1::bigint[]) AS repository_id,
+ unnest($2::text[]) AS storage,
+ unnest($3::bool[]) as successfully_verified,
+ unnest($4::bool[]) AS exists
+ ) AS results
+ JOIN (
+ SELECT repository_id
+ FROM repositories
+ WHERE repository_id = ANY($1::bigint[])
+ FOR UPDATE
+ ) AS lock_repositories USING (repository_id)
+),
+
+release_leases AS (
+ UPDATE storage_repositories
+ SET verification_leased_until = NULL,
+ verified_at = CASE WHEN successfully_verified THEN now() ELSE verified_at END
+ FROM results
+ WHERE storage_repositories.repository_id = results.repository_id
+ AND storage_repositories.storage = results.storage
+)
+
+DELETE FROM storage_repositories
+USING results
+WHERE storage_repositories.repository_id = results.repository_id
+AND storage_repositories.storage = results.storage
+AND successfully_verified AND NOT exists
+ `, repositoryIDs, storages, successfullyVerifieds, exists)
+ if err != nil {
+ return fmt.Errorf("query: %w", err)
+ }
+
+ return nil
+}
+
+func (v *MetadataVerifier) pickJobs(ctx context.Context) ([]verificationJob, error) {
+ var healthyVirtualStorages, healthyStorages []string
+ for virtualStorage, storages := range v.healthChecker.HealthyNodes() {
+ for _, storage := range storages {
+ healthyVirtualStorages = append(healthyVirtualStorages, virtualStorage)
+ healthyStorages = append(healthyStorages, storage)
+ }
+ }
+
+ rows, err := v.db.QueryContext(ctx, `
+WITH to_verify AS (
+ SELECT repository_id, relative_path, replica_path, virtual_storage, storage
+ FROM (
+ SELECT repository_id, storage
+ FROM storage_repositories
+ WHERE ( verified_at IS NULL OR verified_at < now() - $1 * '1 millisecond'::interval )
+ AND verification_leased_until IS NULL
+ ORDER BY verified_at NULLS FIRST
+ FOR NO KEY UPDATE SKIP LOCKED
+ ) AS need_verification
+ JOIN repositories USING (repository_id)
+ JOIN (
+ SELECT unnest($4::text[]) AS virtual_storage,
+ unnest($5::text[]) AS storage
+ ) AS healthy_storages USING (virtual_storage, storage)
+ LIMIT $2
+),
+
+acquire_leases AS (
+ UPDATE storage_repositories
+ SET verification_leased_until = now() + $3 * '1 millisecond'::interval
+ FROM to_verify
+ WHERE storage_repositories.repository_id = to_verify.repository_id
+ AND storage_repositories.storage = to_verify.storage
+)
+
+SELECT repository_id, replica_path, virtual_storage, relative_path, storage
+FROM to_verify
+ `, v.verificationInterval.Milliseconds(), v.batchSize, v.leaseDuration.Milliseconds(), healthyVirtualStorages, healthyStorages)
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer rows.Close()
+
+ var jobs []verificationJob
+ for rows.Next() {
+ var job verificationJob
+ if err := rows.Scan(&job.repositoryID, &job.replicaPath, &job.virtualStorage, &job.relativePath, &job.storage); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ jobs = append(jobs, job)
+ }
+
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("rows: %w", err)
+ }
+
+ return jobs, nil
+}
+
+func (v *MetadataVerifier) verify(ctx context.Context, job verificationJob) (bool, error) {
+ conn, ok := v.conns[job.virtualStorage][job.storage]
+ if !ok {
+ return false, fmt.Errorf("no connection to %q/%q", job.virtualStorage, job.storage)
+ }
+
+ resp, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: job.storage,
+ RelativePath: job.replicaPath,
+ },
+ })
+ if err != nil {
+ return false, err
+ }
+
+ return resp.Exists, nil
+}
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
new file mode 100644
index 000000000..f6d1f8187
--- /dev/null
+++ b/internal/praefect/verifier_test.go
@@ -0,0 +1,591 @@
+package praefect
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ gitalyconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+type erroringRepositoryService struct {
+ gitalypb.RepositoryServiceServer
+}
+
+func (svc erroringRepositoryService) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) {
+ return nil, errors.New("erroring repository exists")
+}
+
+func TestVerifier(t *testing.T) {
+ t.Parallel()
+
+ // replicas contains the replicas the test setup should create, keyed by
+ // virtual storage -> relative path -> storage -> exists.
+ type replicas map[string]map[string]map[string]struct {
+ // exists determines whether the replica exists on the gitaly or not. If false,
+ // the replica is deleted but the metadata record left in place.
+ exists bool
+ // lastVerified is duration that has passed since the last verification.
+ lastVerified time.Duration
+ // isLeased determines whether the replica has a lease acquired during the test.
+ isLeased bool
+ }
+
+ const (
+ neverVerified = 0
+ recentlyVerified = time.Millisecond
+ pendingVerification = 30 * 24 * time.Hour
+ )
+
+ // these are the gitalys setup by the test setup
+ const (
+ gitaly1 = "gitaly-0"
+ gitaly2 = "gitaly-1"
+ gitaly3 = "gitaly-2"
+ )
+
+ type step struct {
+ expectedRemovals logRecord
+ expectedErrors map[string]map[string][]string
+ healthyStorages StaticHealthChecker
+ expectedReplicas map[string]map[string][]string
+ }
+
+ for _, tc := range []struct {
+ desc string
+ erroringGitalys map[string]bool
+ replicas replicas
+ batchSize int
+ steps []step
+ }{
+ {
+ desc: "all replicas exist",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: true, lastVerified: recentlyVerified},
+ gitaly3: {exists: true, lastVerified: pendingVerification},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "recently verified replicas are not picked",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: false, lastVerified: recentlyVerified},
+ gitaly2: {exists: false, lastVerified: recentlyVerified},
+ gitaly3: {exists: false, lastVerified: recentlyVerified},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "replicas on unhealthy storages are not picked",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: true, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: neverVerified},
+ },
+ },
+ },
+ steps: []step{
+ {
+ healthyStorages: StaticHealthChecker{"virtual-storage": {gitaly1, gitaly2}},
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "metadata not deleted for replicas which errored on verification",
+ erroringGitalys: map[string]bool{
+ gitaly3: true,
+ },
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: true, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: neverVerified},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedErrors: map[string]map[string][]string{
+ "virtual-storage": {"repository-1": {gitaly3}},
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "replicas with leases acquired are not picked",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified, isLeased: true},
+ gitaly2: {exists: false, lastVerified: neverVerified, isLeased: true},
+ gitaly3: {exists: false, lastVerified: pendingVerification, isLeased: true},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "replicas missing have their metadata records removed",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: false, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: pendingVerification},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly2, gitaly3},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "verification time is updated when repository exists",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: false, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: pendingVerification},
+ },
+ },
+ },
+ batchSize: 1,
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly2},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly3},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "all replicas are lost",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ // The verification should be prioritized never verified repos
+ // are first and repos that were verified a longer time ago after them.
+ gitaly1: {exists: false, lastVerified: neverVerified},
+ gitaly2: {exists: false, lastVerified: pendingVerification + time.Hour},
+ gitaly3: {exists: false, lastVerified: pendingVerification},
+ },
+ },
+ },
+ batchSize: 1,
+ steps: []step{
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly1},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly2, gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly2},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly3},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{},
+ },
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {Name: "virtual-storage"},
+ },
+ Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository},
+ }
+
+ for i := 0; i < 3; i++ {
+ storageName := fmt.Sprintf("gitaly-%d", i)
+
+ registerFunc := setup.RegisterAll
+ if tc.erroringGitalys[storageName] {
+ registerFunc = func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(srv, erroringRepositoryService{repository.NewServer(
+ deps.GetCfg(),
+ deps.GetRubyServer(),
+ deps.GetLocator(),
+ deps.GetTxManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetCatfileCache(),
+ deps.GetConnsPool(),
+ deps.GetGit2goExecutor(),
+ deps.GetHousekeepingManager(),
+ )})
+ }
+ }
+
+ cfg := testcfg.Build(t, testcfg.WithStorages(storageName))
+ cfg.SocketPath = testserver.RunGitalyServer(t, cfg, nil, registerFunc, testserver.WithDisablePraefect())
+ conf.VirtualStorages[0].Nodes = append(conf.VirtualStorages[0].Nodes, &config.Node{
+ Storage: storageName,
+ Address: cfg.SocketPath,
+ })
+ }
+
+ db := testdb.New(t)
+ discardLogger := testhelper.NewDiscardingLogEntry(t)
+ sidechannelRegistry := sidechannel.NewRegistry()
+ txManager := transactions.NewManager(config.Config{})
+ nodeSet, err := DialNodes(
+ ctx,
+ conf.VirtualStorages,
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ backchannel.NewClientHandshaker(
+ discardLogger,
+ NewBackchannelServerFactory(
+ discardLogger,
+ transaction.NewServer(txManager),
+ sidechannelRegistry,
+ ),
+ ),
+ sidechannelRegistry,
+ )
+ require.NoError(t, err)
+ t.Cleanup(nodeSet.Close)
+
+ tx := db.Begin(t)
+ t.Cleanup(func() { tx.Rollback(t) })
+ testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{
+ "praefect-0": conf.StorageNames(),
+ })
+ elector := nodes.NewPerRepositoryElector(tx)
+ conns := nodeSet.Connections()
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+
+ conn, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
+ withRouter: NewPerRepositoryRouter(
+ conns,
+ elector,
+ StaticHealthChecker(conf.StorageNames()),
+ NewLockedRandom(rand.New(rand.NewSource(0))),
+ rs,
+ datastore.NewAssignmentStore(db, conf.StorageNames()),
+ rs,
+ conf.DefaultReplicationFactors(),
+ ),
+ withRepoStore: rs,
+ withTxMgr: txManager,
+ })
+ t.Cleanup(cleanup)
+
+ // Set up the test repositories.
+ for virtualStorage, relativePaths := range tc.replicas {
+ for relativePath, storages := range relativePaths {
+ // Create the expected repository. This creates all of the replicas transactionally.
+ gittest.CreateRepository(ctx, t,
+ gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: virtualStorage}}},
+ gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: relativePath},
+ )
+
+ // Now remove the replicas that were created in the transaction but the test case
+ // expects not to exist. We remove them directly from the Gitalys so the metadata
+ // records are left in place.
+ for storage, replica := range storages {
+ // Set the last verification time to what the test expects it to be.
+ if replica.lastVerified > 0 {
+ _, err := db.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verified_at = now() - $4 * '1 millisecond'::interval
+ FROM repositories
+ WHERE storage_repositories.repository_id = repositories.repository_id
+ AND repositories.virtual_storage = $1
+ AND repositories.relative_path = $2
+ AND storage = $3`,
+ virtualStorage, relativePath, storage, replica.lastVerified.Milliseconds())
+ require.NoError(t, err)
+ }
+
+ // Set a lease if the test expects the record to be leased.
+ if replica.isLeased {
+ _, err := db.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verification_leased_until = now()
+ FROM repositories
+ WHERE storage_repositories.repository_id = repositories.repository_id
+ AND repositories.virtual_storage = $1
+ AND repositories.relative_path = $2
+ AND storage = $3`,
+ virtualStorage, relativePath, storage)
+ require.NoError(t, err)
+ }
+
+ if replica.exists {
+ continue
+ }
+
+ _, err := gitalypb.NewRepositoryServiceClient(conns[virtualStorage][storage]).RemoveRepository(ctx,
+ &gitalypb.RemoveRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: storage,
+ RelativePath: relativePath,
+ },
+ },
+ )
+ require.NoError(t, err)
+ }
+ }
+ }
+
+ // Create a repository and lock its records to assert the dequeuer does not wait on row locks.
+ gittest.CreateRepository(ctx, t,
+ gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: "virtual-storage"}}},
+ gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: "locked-repository"},
+ )
+
+ rowLockTx := db.Begin(t)
+ defer rowLockTx.Rollback(t)
+
+ var lockedRows int
+ require.NoError(t, rowLockTx.QueryRowContext(ctx, `
+ WITH locked_repository AS (
+ SELECT repository_id
+ FROM repositories
+ WHERE repositories.virtual_storage = 'virtual-storage'
+ AND repositories.relative_path = 'locked-repository'
+ FOR UPDATE
+ ),
+
+ locked_replicas AS (
+ SELECT FROM storage_repositories
+ JOIN locked_repository USING (repository_id)
+ FOR UPDATE
+ )
+
+ SELECT count(*) FROM locked_replicas`,
+ ).Scan(&lockedRows))
+ require.Equal(t, 3, lockedRows)
+
+ for _, step := range tc.steps {
+ logger, hook := test.NewNullLogger()
+
+ healthyStorages := StaticHealthChecker{"virtual-storage": []string{gitaly1, gitaly2, gitaly3}}
+ if step.healthyStorages != nil {
+ healthyStorages = step.healthyStorages
+ }
+
+ verifier := NewMetadataVerifier(logger, db, conns, healthyStorages, 24*7*time.Hour)
+ if tc.batchSize > 0 {
+ verifier.batchSize = tc.batchSize
+ }
+
+ runCtx, cancelRun := context.WithCancel(ctx)
+ err = verifier.Run(runCtx, helper.NewCountTicker(1, cancelRun))
+ require.Equal(t, context.Canceled, err)
+
+ // Ensure the removals and errors are correctly logged
+ var actualRemovals logRecord
+ actualErrors := map[string]map[string][]string{}
+ for _, entry := range hook.Entries {
+ switch entry.Message {
+ case "removing metadata records of non-existent replicas":
+ if len(step.expectedRemovals) == 0 {
+ t.Fatalf("unexpected removals logged")
+ }
+
+ actualRemovals = entry.Data["replicas"].(logRecord)
+ case "failed to verify replica's existence":
+ if len(step.expectedErrors) == 0 {
+ t.Fatalf("unexpected errors logged")
+ }
+
+ virtualStorage := entry.Data["virtual_storage"].(string)
+ relativePath := entry.Data["relative_path"].(string)
+
+ if actualErrors[virtualStorage] == nil {
+ actualErrors[virtualStorage] = map[string][]string{}
+ }
+
+ actualErrors[virtualStorage][relativePath] = append(actualErrors[virtualStorage][relativePath], entry.Data["storage"].(string))
+ default:
+ t.Fatalf("unexpected log message")
+ }
+ }
+
+ if len(step.expectedErrors) > 0 {
+ require.Equal(t, step.expectedErrors, actualErrors)
+ }
+
+ if len(step.expectedRemovals) > 0 {
+ require.Equal(t, step.expectedRemovals, actualRemovals)
+ }
+
+ // The repository record should always be left in place. Otherwise data loss go unnoticed
+ // when all replica records are lost.
+ exists, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "virtual-storage",
+ RelativePath: "repository-1",
+ },
+ })
+ require.NoError(t, err)
+ require.True(t, exists.GetExists())
+
+ // Ensure all the metadata still contains the expected replicas
+ require.Equal(t, step.expectedReplicas, getAllReplicas(ctx, t, db))
+ }
+ })
+ }
+}
+
+// getAllReplicas gets all replicas from the database except for the locked-repository which is created
+// by the test suite to ensure non-blocking queries.
+func getAllReplicas(ctx context.Context, t testing.TB, db glsql.Querier) map[string]map[string][]string {
+ rows, err := db.QueryContext(ctx, `
+ SELECT repositories.virtual_storage, repositories.relative_path, storage
+ FROM repositories
+ JOIN storage_repositories USING (repository_id)
+ WHERE repositories.relative_path != 'locked-repository'
+ ORDER BY virtual_storage, relative_path, storage
+ `)
+ require.NoError(t, err)
+ defer rows.Close()
+
+ results := map[string]map[string][]string{}
+ for rows.Next() {
+ var virtualStorage, relativePath, storage string
+ require.NoError(t, rows.Scan(&virtualStorage, &relativePath, &storage))
+
+ if results[virtualStorage] == nil {
+ results[virtualStorage] = map[string][]string{}
+ }
+
+ results[virtualStorage][relativePath] = append(results[virtualStorage][relativePath], storage)
+ }
+ require.NoError(t, rows.Err())
+
+ return results
+}