diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-13 12:41:00 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-13 12:41:00 +0300 |
commit | 9413ca591ebe30dcb133c86d0ec53f6bc2fc30bb (patch) | |
tree | b3cbb9214adbd7618ad44916fc004a70c1aaf280 | |
parent | 93153d53f1c77a28ef76ae9c5777ed5477835962 (diff) | |
parent | 85ace7cf4f63ab8d99372c74f4e7bcb09a2ac219 (diff) |
Merge branch 'smh-background-verifier' into 'master'
Initial implementation of a metadata verifier
See merge request gitlab-org/gitaly!4459
-rw-r--r-- | _support/praefect-schema.sql | 11 | ||||
-rw-r--r-- | cmd/praefect/main.go | 18 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 42 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 5 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 3 | ||||
-rw-r--r-- | internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go | 23 | ||||
-rw-r--r-- | internal/praefect/verifier.go | 277 | ||||
-rw-r--r-- | internal/praefect/verifier_test.go | 591 |
8 files changed, 955 insertions, 15 deletions
diff --git a/_support/praefect-schema.sql b/_support/praefect-schema.sql index b69a2f0c6..78316e80c 100644 --- a/_support/praefect-schema.sql +++ b/_support/praefect-schema.sql @@ -266,7 +266,9 @@ CREATE TABLE public.storage_repositories ( relative_path text NOT NULL, storage text NOT NULL, generation bigint NOT NULL, - repository_id bigint NOT NULL + repository_id bigint NOT NULL, + verified_at timestamp with time zone, + verification_leased_until timestamp with time zone ); @@ -556,6 +558,13 @@ CREATE UNIQUE INDEX storage_repositories_new_pkey ON public.storage_repositories -- +-- Name: verification_queue; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX verification_queue ON public.storage_repositories USING btree (verified_at NULLS FIRST) WHERE (verification_leased_until IS NULL); + + +-- -- Name: virtual_target_on_replication_queue_idx; Type: INDEX; Schema: public; Owner: - -- diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 7f072b0f3..20f7b841f 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -349,6 +349,24 @@ func run( rs, conf.DefaultReplicationFactors(), ) + + go func() { + if conf.BackgroundVerification.VerificationInterval <= 0 { + logger.Info("background verifier is disabled") + return + } + + logger.WithField("config", conf.BackgroundVerification).Info("background verifier started") + if err := praefect.NewMetadataVerifier( + logger, + db, + nodeSet.Connections(), + hm, + conf.BackgroundVerification.VerificationInterval, + ).Run(ctx, helper.NewTimerTicker(2*time.Second)); err != nil { + logger.WithError(err).Error("metadata verifier finished") + } + }() } else { if conf.Failover.Enabled { logger.WithField("election_strategy", conf.Failover.ElectionStrategy).Warn( diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 286577d6a..7c9a2791c 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -78,6 +78,18 @@ func (f Failover) ErrorThresholdsConfigured() (bool, error) { return true, nil } +// BackgroundVerification contains configuration options for the repository background verification. +type BackgroundVerification struct { + // VerificationInterval determines the duration after a replica due for reverification. + // The feature is disabled if verification interval is 0 or below. + VerificationInterval time.Duration `toml:"verification_interval,omitempty"` +} + +// DefaultBackgroundVerificationConfig returns the default background verification configuration. +func DefaultBackgroundVerificationConfig() BackgroundVerification { + return BackgroundVerification{} +} + // Reconciliation contains reconciliation specific configuration options. type Reconciliation struct { // SchedulingInterval the interval between each automatic reconciliation run. If set to 0, @@ -112,17 +124,18 @@ func DefaultReplicationConfig() Replication { // Config is a container for everything found in the TOML config file type Config struct { - AllowLegacyElectors bool `toml:"i_understand_my_election_strategy_is_unsupported_and_will_be_removed_without_warning,omitempty"` - Reconciliation Reconciliation `toml:"reconciliation,omitempty"` - Replication Replication `toml:"replication,omitempty"` - ListenAddr string `toml:"listen_addr,omitempty"` - TLSListenAddr string `toml:"tls_listen_addr,omitempty"` - SocketPath string `toml:"socket_path,omitempty"` - VirtualStorages []*VirtualStorage `toml:"virtual_storage,omitempty"` - Logging log.Config `toml:"logging,omitempty"` - Sentry sentry.Config `toml:"sentry,omitempty"` - PrometheusListenAddr string `toml:"prometheus_listen_addr,omitempty"` - Prometheus prometheus.Config `toml:"prometheus,omitempty"` + AllowLegacyElectors bool `toml:"i_understand_my_election_strategy_is_unsupported_and_will_be_removed_without_warning,omitempty"` + BackgroundVerification BackgroundVerification `toml:"background_verification,omitempty"` + Reconciliation Reconciliation `toml:"reconciliation,omitempty"` + Replication Replication `toml:"replication,omitempty"` + ListenAddr string `toml:"listen_addr,omitempty"` + TLSListenAddr string `toml:"tls_listen_addr,omitempty"` + SocketPath string `toml:"socket_path,omitempty"` + VirtualStorages []*VirtualStorage `toml:"virtual_storage,omitempty"` + Logging log.Config `toml:"logging,omitempty"` + Sentry sentry.Config `toml:"sentry,omitempty"` + PrometheusListenAddr string `toml:"prometheus_listen_addr,omitempty"` + Prometheus prometheus.Config `toml:"prometheus,omitempty"` // PrometheusExcludeDatabaseFromDefaultMetrics excludes database-related metrics from the // default metrics. If set to `false`, then database metrics will be available both via // `/metrics` and `/db_metrics`. Otherwise, they will only be accessible via `/db_metrics`. @@ -160,9 +173,10 @@ func FromFile(filePath string) (Config, error) { } conf := &Config{ - Reconciliation: DefaultReconciliationConfig(), - Replication: DefaultReplicationConfig(), - Prometheus: prometheus.DefaultConfig(), + BackgroundVerification: DefaultBackgroundVerificationConfig(), + Reconciliation: DefaultReconciliationConfig(), + Replication: DefaultReplicationConfig(), + Prometheus: prometheus.DefaultConfig(), PrometheusExcludeDatabaseFromDefaultMetrics: true, // Sets the default Failover, to be overwritten when deserializing the TOML Failover: Failover{Enabled: true, ElectionStrategy: ElectionStrategyPerRepository}, diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index d3c8e3cfb..aa0d0b3c5 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -335,6 +335,9 @@ func TestConfigParsing(t *testing.T) { RunInterval: config.Duration(3 * time.Second), RepositoriesInBatch: 10, }, + BackgroundVerification: BackgroundVerification{ + VerificationInterval: 24 * time.Hour, + }, }, }, { @@ -360,6 +363,7 @@ func TestConfigParsing(t *testing.T) { RunInterval: config.Duration(4 * time.Second), RepositoriesInBatch: 11, }, + BackgroundVerification: DefaultBackgroundVerificationConfig(), }, }, { @@ -382,6 +386,7 @@ func TestConfigParsing(t *testing.T) { RunInterval: config.Duration(24 * time.Hour), RepositoriesInBatch: 16, }, + BackgroundVerification: DefaultBackgroundVerificationConfig(), }, }, { diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 839b807b0..3f2a370e2 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -6,6 +6,9 @@ prometheus_listen_addr = "" memory_queue_enabled = true graceful_stop_timeout = "30s" +[background_verification] +verification_interval = "24h" + [replication] batch_size = 1 parallel_storage_processing_workers = 2 diff --git a/internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go b/internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go new file mode 100644 index 000000000..578a86948 --- /dev/null +++ b/internal/praefect/datastore/migrations/20220303105110_background_verification_columns.go @@ -0,0 +1,23 @@ +package migrations + +import migrate "github.com/rubenv/sql-migrate" + +func init() { + m := &migrate.Migration{ + Id: "20220303105110_background_verification_columns", + Up: []string{ + "ALTER TABLE storage_repositories ADD COLUMN verified_at TIMESTAMPTZ", + "ALTER TABLE storage_repositories ADD COLUMN verification_leased_until TIMESTAMPTZ", + `CREATE INDEX verification_queue ON storage_repositories ( verified_at NULLS FIRST ) + WHERE verification_leased_until IS NULL + `, + }, + Down: []string{ + "DROP INDEX verification_queue", + "ALTER TABLE storage_repositories DROP COLUMN verification_leased_until", + "ALTER TABLE storage_repositories DROP COLUMN verified_at", + }, + } + + allMigrations = append(allMigrations, m) +} diff --git a/internal/praefect/verifier.go b/internal/praefect/verifier.go new file mode 100644 index 000000000..543d0d4c2 --- /dev/null +++ b/internal/praefect/verifier.go @@ -0,0 +1,277 @@ +package praefect + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" +) + +// MetadataVerifier verifies the repository metadata against the actual replicas on the +// Gitaly nodes. It queries the database for replicas that haven't been verified in a given +// time and checks whether the Gitalys still have them. If a Gitaly doesn't have a replica, +// the replica's metadata record is removed and the removal logged. The repository's record +// is still left in place even if all of the replicas are lost to ensure the data loss doesn't +// go unnoticed. +type MetadataVerifier struct { + log logrus.FieldLogger + db glsql.Querier + conns Connections + batchSize int + leaseDuration time.Duration + healthChecker HealthChecker + verificationInterval time.Duration +} + +// NewMetadataVerifier creates a new MetadataVerifier. +func NewMetadataVerifier( + log logrus.FieldLogger, + db glsql.Querier, + conns Connections, + healthChecker HealthChecker, + verificationInterval time.Duration, +) *MetadataVerifier { + return &MetadataVerifier{ + log: log, + db: db, + conns: conns, + batchSize: 25, + leaseDuration: 30 * time.Second, + healthChecker: healthChecker, + verificationInterval: verificationInterval, + } +} + +type verificationJob struct { + repositoryID int64 + virtualStorage string + relativePath string + storage string + replicaPath string +} + +type verificationResult struct { + job verificationJob + exists bool + error error +} + +// Run runs the metadata verifier. It keeps running until the context is canceled. +func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error { + defer ticker.Stop() + + for { + ticker.Reset() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C(): + if err := v.run(ctx); err != nil { + v.log.WithError(err).Error("failed a background verification run") + } + } + } +} + +func (v *MetadataVerifier) run(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, v.leaseDuration) + defer cancel() + + jobs, err := v.pickJobs(ctx) + if err != nil { + return fmt.Errorf("pick jobs: %w", err) + } + + var wg sync.WaitGroup + wg.Add(len(jobs)) + results := make([]verificationResult, len(jobs)) + for i, job := range jobs { + i, job := i, job + go func() { + defer wg.Done() + + exists, err := v.verify(ctx, jobs[i]) + results[i] = verificationResult{ + job: job, + exists: exists, + error: err, + } + }() + } + + wg.Wait() + + return v.updateMetadata(ctx, results) +} + +// logRecord is a helper type for gathering the removed replicas and logging them. +type logRecord map[string]map[string][]string + +// markRemoved marks the given replica as removed. +func (r logRecord) markRemoved(virtualStorage, relativePath, storage string) { + relativePaths, ok := r[virtualStorage] + if !ok { + relativePaths = map[string][]string{} + } + + relativePaths[relativePath] = append(relativePaths[relativePath], storage) + r[virtualStorage] = relativePaths + sort.Strings(relativePaths[relativePath]) +} + +func (v *MetadataVerifier) updateMetadata(ctx context.Context, results []verificationResult) error { + repositoryIDs := make([]int64, len(results)) + storages := make([]string, len(results)) + successfullyVerifieds := make([]bool, len(results)) + exists := make([]bool, len(results)) + + logRecords := logRecord{} + for i, result := range results { + repositoryIDs[i] = result.job.repositoryID + storages[i] = result.job.storage + exists[i] = result.exists + successfullyVerifieds[i] = result.error == nil + + if result.error != nil { + v.log.WithFields(logrus.Fields{ + "repository_id": result.job.repositoryID, + "replica_path": result.job.replicaPath, + "virtual_storage": result.job.virtualStorage, + "storage": result.job.storage, + "relative_path": result.job.relativePath, + logrus.ErrorKey: result.error, + }).Error("failed to verify replica's existence") + } else if !result.exists { + logRecords.markRemoved(result.job.virtualStorage, result.job.relativePath, result.job.storage) + } + } + + if len(logRecords) > 0 { + v.log.WithField("replicas", logRecords).Info("removing metadata records of non-existent replicas") + } + + _, err := v.db.ExecContext(ctx, ` +WITH results AS ( + SELECT repository_id, storage, successfully_verified, exists + FROM ( + SELECT unnest($1::bigint[]) AS repository_id, + unnest($2::text[]) AS storage, + unnest($3::bool[]) as successfully_verified, + unnest($4::bool[]) AS exists + ) AS results + JOIN ( + SELECT repository_id + FROM repositories + WHERE repository_id = ANY($1::bigint[]) + FOR UPDATE + ) AS lock_repositories USING (repository_id) +), + +release_leases AS ( + UPDATE storage_repositories + SET verification_leased_until = NULL, + verified_at = CASE WHEN successfully_verified THEN now() ELSE verified_at END + FROM results + WHERE storage_repositories.repository_id = results.repository_id + AND storage_repositories.storage = results.storage +) + +DELETE FROM storage_repositories +USING results +WHERE storage_repositories.repository_id = results.repository_id +AND storage_repositories.storage = results.storage +AND successfully_verified AND NOT exists + `, repositoryIDs, storages, successfullyVerifieds, exists) + if err != nil { + return fmt.Errorf("query: %w", err) + } + + return nil +} + +func (v *MetadataVerifier) pickJobs(ctx context.Context) ([]verificationJob, error) { + var healthyVirtualStorages, healthyStorages []string + for virtualStorage, storages := range v.healthChecker.HealthyNodes() { + for _, storage := range storages { + healthyVirtualStorages = append(healthyVirtualStorages, virtualStorage) + healthyStorages = append(healthyStorages, storage) + } + } + + rows, err := v.db.QueryContext(ctx, ` +WITH to_verify AS ( + SELECT repository_id, relative_path, replica_path, virtual_storage, storage + FROM ( + SELECT repository_id, storage + FROM storage_repositories + WHERE ( verified_at IS NULL OR verified_at < now() - $1 * '1 millisecond'::interval ) + AND verification_leased_until IS NULL + ORDER BY verified_at NULLS FIRST + FOR NO KEY UPDATE SKIP LOCKED + ) AS need_verification + JOIN repositories USING (repository_id) + JOIN ( + SELECT unnest($4::text[]) AS virtual_storage, + unnest($5::text[]) AS storage + ) AS healthy_storages USING (virtual_storage, storage) + LIMIT $2 +), + +acquire_leases AS ( + UPDATE storage_repositories + SET verification_leased_until = now() + $3 * '1 millisecond'::interval + FROM to_verify + WHERE storage_repositories.repository_id = to_verify.repository_id + AND storage_repositories.storage = to_verify.storage +) + +SELECT repository_id, replica_path, virtual_storage, relative_path, storage +FROM to_verify + `, v.verificationInterval.Milliseconds(), v.batchSize, v.leaseDuration.Milliseconds(), healthyVirtualStorages, healthyStorages) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer rows.Close() + + var jobs []verificationJob + for rows.Next() { + var job verificationJob + if err := rows.Scan(&job.repositoryID, &job.replicaPath, &job.virtualStorage, &job.relativePath, &job.storage); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + + jobs = append(jobs, job) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("rows: %w", err) + } + + return jobs, nil +} + +func (v *MetadataVerifier) verify(ctx context.Context, job verificationJob) (bool, error) { + conn, ok := v.conns[job.virtualStorage][job.storage] + if !ok { + return false, fmt.Errorf("no connection to %q/%q", job.virtualStorage, job.storage) + } + + resp, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: &gitalypb.Repository{ + StorageName: job.storage, + RelativePath: job.replicaPath, + }, + }) + if err != nil { + return false, err + } + + return resp.Exists, nil +} diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go new file mode 100644 index 000000000..f6d1f8187 --- /dev/null +++ b/internal/praefect/verifier_test.go @@ -0,0 +1,591 @@ +package praefect + +import ( + "context" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + gitalyconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" + "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" + "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type erroringRepositoryService struct { + gitalypb.RepositoryServiceServer +} + +func (svc erroringRepositoryService) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) { + return nil, errors.New("erroring repository exists") +} + +func TestVerifier(t *testing.T) { + t.Parallel() + + // replicas contains the replicas the test setup should create, keyed by + // virtual storage -> relative path -> storage -> exists. + type replicas map[string]map[string]map[string]struct { + // exists determines whether the replica exists on the gitaly or not. If false, + // the replica is deleted but the metadata record left in place. + exists bool + // lastVerified is duration that has passed since the last verification. + lastVerified time.Duration + // isLeased determines whether the replica has a lease acquired during the test. + isLeased bool + } + + const ( + neverVerified = 0 + recentlyVerified = time.Millisecond + pendingVerification = 30 * 24 * time.Hour + ) + + // these are the gitalys setup by the test setup + const ( + gitaly1 = "gitaly-0" + gitaly2 = "gitaly-1" + gitaly3 = "gitaly-2" + ) + + type step struct { + expectedRemovals logRecord + expectedErrors map[string]map[string][]string + healthyStorages StaticHealthChecker + expectedReplicas map[string]map[string][]string + } + + for _, tc := range []struct { + desc string + erroringGitalys map[string]bool + replicas replicas + batchSize int + steps []step + }{ + { + desc: "all replicas exist", + replicas: replicas{ + "virtual-storage": { + "repository-1": { + gitaly1: {exists: true, lastVerified: neverVerified}, + gitaly2: {exists: true, lastVerified: recentlyVerified}, + gitaly3: {exists: true, lastVerified: pendingVerification}, + }, + }, + }, + steps: []step{ + { + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1, gitaly2, gitaly3}, + }, + }, + }, + }, + }, + { + desc: "recently verified replicas are not picked", + replicas: replicas{ + "virtual-storage": { + "repository-1": { + gitaly1: {exists: false, lastVerified: recentlyVerified}, + gitaly2: {exists: false, lastVerified: recentlyVerified}, + gitaly3: {exists: false, lastVerified: recentlyVerified}, + }, + }, + }, + steps: []step{ + { + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1, gitaly2, gitaly3}, + }, + }, + }, + }, + }, + { + desc: "replicas on unhealthy storages are not picked", + replicas: replicas{ + "virtual-storage": { + "repository-1": { + gitaly1: {exists: true, lastVerified: neverVerified}, + gitaly2: {exists: true, lastVerified: neverVerified}, + gitaly3: {exists: false, lastVerified: neverVerified}, + }, + }, + }, + steps: []step{ + { + healthyStorages: StaticHealthChecker{"virtual-storage": {gitaly1, gitaly2}}, + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1, gitaly2, gitaly3}, + }, + }, + }, + }, + }, + { + desc: "metadata not deleted for replicas which errored on verification", + erroringGitalys: map[string]bool{ + gitaly3: true, + }, + replicas: replicas{ + "virtual-storage": { + "repository-1": { + gitaly1: {exists: true, lastVerified: neverVerified}, + gitaly2: {exists: true, lastVerified: neverVerified}, + gitaly3: {exists: false, lastVerified: neverVerified}, + }, + }, + }, + steps: []step{ + { + expectedErrors: map[string]map[string][]string{ + "virtual-storage": {"repository-1": {gitaly3}}, + }, + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1, gitaly2, gitaly3}, + }, + }, + }, + }, + }, + { + desc: "replicas with leases acquired are not picked", + replicas: replicas{ + "virtual-storage": { + "repository-1": { + gitaly1: {exists: true, lastVerified: neverVerified, isLeased: true}, + gitaly2: {exists: false, lastVerified: neverVerified, isLeased: true}, + gitaly3: {exists: false, lastVerified: pendingVerification, isLeased: true}, + }, + }, + }, + steps: []step{ + { + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1, gitaly2, gitaly3}, + }, + }, + }, + }, + }, + { + desc: "replicas missing have their metadata records removed", + replicas: replicas{ + "virtual-storage": { + "repository-1": { + gitaly1: {exists: true, lastVerified: neverVerified}, + gitaly2: {exists: false, lastVerified: neverVerified}, + gitaly3: {exists: false, lastVerified: pendingVerification}, + }, + }, + }, + steps: []step{ + { + expectedRemovals: logRecord{ + "virtual-storage": { + "repository-1": {gitaly2, gitaly3}, + }, + }, + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1}, + }, + }, + }, + }, + }, + { + desc: "verification time is updated when repository exists", + replicas: replicas{ + "virtual-storage": { + "repository-1": { + gitaly1: {exists: true, lastVerified: neverVerified}, + gitaly2: {exists: false, lastVerified: neverVerified}, + gitaly3: {exists: false, lastVerified: pendingVerification}, + }, + }, + }, + batchSize: 1, + steps: []step{ + { + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1, gitaly2, gitaly3}, + }, + }, + }, + { + expectedRemovals: logRecord{ + "virtual-storage": { + "repository-1": {gitaly2}, + }, + }, + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1, gitaly3}, + }, + }, + }, + { + expectedRemovals: logRecord{ + "virtual-storage": { + "repository-1": {gitaly3}, + }, + }, + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly1}, + }, + }, + }, + }, + }, + { + desc: "all replicas are lost", + replicas: replicas{ + "virtual-storage": { + "repository-1": { + // The verification should be prioritized never verified repos + // are first and repos that were verified a longer time ago after them. + gitaly1: {exists: false, lastVerified: neverVerified}, + gitaly2: {exists: false, lastVerified: pendingVerification + time.Hour}, + gitaly3: {exists: false, lastVerified: pendingVerification}, + }, + }, + }, + batchSize: 1, + steps: []step{ + { + expectedRemovals: logRecord{ + "virtual-storage": { + "repository-1": {gitaly1}, + }, + }, + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly2, gitaly3}, + }, + }, + }, + { + expectedRemovals: logRecord{ + "virtual-storage": { + "repository-1": {gitaly2}, + }, + }, + expectedReplicas: map[string]map[string][]string{ + "virtual-storage": { + "repository-1": {gitaly3}, + }, + }, + }, + { + expectedRemovals: logRecord{ + "virtual-storage": { + "repository-1": {gitaly3}, + }, + }, + expectedReplicas: map[string]map[string][]string{}, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := testhelper.Context(t) + + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + {Name: "virtual-storage"}, + }, + Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}, + } + + for i := 0; i < 3; i++ { + storageName := fmt.Sprintf("gitaly-%d", i) + + registerFunc := setup.RegisterAll + if tc.erroringGitalys[storageName] { + registerFunc = func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterRepositoryServiceServer(srv, erroringRepositoryService{repository.NewServer( + deps.GetCfg(), + deps.GetRubyServer(), + deps.GetLocator(), + deps.GetTxManager(), + deps.GetGitCmdFactory(), + deps.GetCatfileCache(), + deps.GetConnsPool(), + deps.GetGit2goExecutor(), + deps.GetHousekeepingManager(), + )}) + } + } + + cfg := testcfg.Build(t, testcfg.WithStorages(storageName)) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, nil, registerFunc, testserver.WithDisablePraefect()) + conf.VirtualStorages[0].Nodes = append(conf.VirtualStorages[0].Nodes, &config.Node{ + Storage: storageName, + Address: cfg.SocketPath, + }) + } + + db := testdb.New(t) + discardLogger := testhelper.NewDiscardingLogEntry(t) + sidechannelRegistry := sidechannel.NewRegistry() + txManager := transactions.NewManager(config.Config{}) + nodeSet, err := DialNodes( + ctx, + conf.VirtualStorages, + protoregistry.GitalyProtoPreregistered, + nil, + backchannel.NewClientHandshaker( + discardLogger, + NewBackchannelServerFactory( + discardLogger, + transaction.NewServer(txManager), + sidechannelRegistry, + ), + ), + sidechannelRegistry, + ) + require.NoError(t, err) + t.Cleanup(nodeSet.Close) + + tx := db.Begin(t) + t.Cleanup(func() { tx.Rollback(t) }) + testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{ + "praefect-0": conf.StorageNames(), + }) + elector := nodes.NewPerRepositoryElector(tx) + conns := nodeSet.Connections() + rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames()) + + conn, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ + withRouter: NewPerRepositoryRouter( + conns, + elector, + StaticHealthChecker(conf.StorageNames()), + NewLockedRandom(rand.New(rand.NewSource(0))), + rs, + datastore.NewAssignmentStore(db, conf.StorageNames()), + rs, + conf.DefaultReplicationFactors(), + ), + withRepoStore: rs, + withTxMgr: txManager, + }) + t.Cleanup(cleanup) + + // Set up the test repositories. + for virtualStorage, relativePaths := range tc.replicas { + for relativePath, storages := range relativePaths { + // Create the expected repository. This creates all of the replicas transactionally. + gittest.CreateRepository(ctx, t, + gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: virtualStorage}}}, + gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: relativePath}, + ) + + // Now remove the replicas that were created in the transaction but the test case + // expects not to exist. We remove them directly from the Gitalys so the metadata + // records are left in place. + for storage, replica := range storages { + // Set the last verification time to what the test expects it to be. + if replica.lastVerified > 0 { + _, err := db.ExecContext(ctx, ` + UPDATE storage_repositories + SET verified_at = now() - $4 * '1 millisecond'::interval + FROM repositories + WHERE storage_repositories.repository_id = repositories.repository_id + AND repositories.virtual_storage = $1 + AND repositories.relative_path = $2 + AND storage = $3`, + virtualStorage, relativePath, storage, replica.lastVerified.Milliseconds()) + require.NoError(t, err) + } + + // Set a lease if the test expects the record to be leased. + if replica.isLeased { + _, err := db.ExecContext(ctx, ` + UPDATE storage_repositories + SET verification_leased_until = now() + FROM repositories + WHERE storage_repositories.repository_id = repositories.repository_id + AND repositories.virtual_storage = $1 + AND repositories.relative_path = $2 + AND storage = $3`, + virtualStorage, relativePath, storage) + require.NoError(t, err) + } + + if replica.exists { + continue + } + + _, err := gitalypb.NewRepositoryServiceClient(conns[virtualStorage][storage]).RemoveRepository(ctx, + &gitalypb.RemoveRepositoryRequest{ + Repository: &gitalypb.Repository{ + StorageName: storage, + RelativePath: relativePath, + }, + }, + ) + require.NoError(t, err) + } + } + } + + // Create a repository and lock its records to assert the dequeuer does not wait on row locks. + gittest.CreateRepository(ctx, t, + gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: "virtual-storage"}}}, + gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: "locked-repository"}, + ) + + rowLockTx := db.Begin(t) + defer rowLockTx.Rollback(t) + + var lockedRows int + require.NoError(t, rowLockTx.QueryRowContext(ctx, ` + WITH locked_repository AS ( + SELECT repository_id + FROM repositories + WHERE repositories.virtual_storage = 'virtual-storage' + AND repositories.relative_path = 'locked-repository' + FOR UPDATE + ), + + locked_replicas AS ( + SELECT FROM storage_repositories + JOIN locked_repository USING (repository_id) + FOR UPDATE + ) + + SELECT count(*) FROM locked_replicas`, + ).Scan(&lockedRows)) + require.Equal(t, 3, lockedRows) + + for _, step := range tc.steps { + logger, hook := test.NewNullLogger() + + healthyStorages := StaticHealthChecker{"virtual-storage": []string{gitaly1, gitaly2, gitaly3}} + if step.healthyStorages != nil { + healthyStorages = step.healthyStorages + } + + verifier := NewMetadataVerifier(logger, db, conns, healthyStorages, 24*7*time.Hour) + if tc.batchSize > 0 { + verifier.batchSize = tc.batchSize + } + + runCtx, cancelRun := context.WithCancel(ctx) + err = verifier.Run(runCtx, helper.NewCountTicker(1, cancelRun)) + require.Equal(t, context.Canceled, err) + + // Ensure the removals and errors are correctly logged + var actualRemovals logRecord + actualErrors := map[string]map[string][]string{} + for _, entry := range hook.Entries { + switch entry.Message { + case "removing metadata records of non-existent replicas": + if len(step.expectedRemovals) == 0 { + t.Fatalf("unexpected removals logged") + } + + actualRemovals = entry.Data["replicas"].(logRecord) + case "failed to verify replica's existence": + if len(step.expectedErrors) == 0 { + t.Fatalf("unexpected errors logged") + } + + virtualStorage := entry.Data["virtual_storage"].(string) + relativePath := entry.Data["relative_path"].(string) + + if actualErrors[virtualStorage] == nil { + actualErrors[virtualStorage] = map[string][]string{} + } + + actualErrors[virtualStorage][relativePath] = append(actualErrors[virtualStorage][relativePath], entry.Data["storage"].(string)) + default: + t.Fatalf("unexpected log message") + } + } + + if len(step.expectedErrors) > 0 { + require.Equal(t, step.expectedErrors, actualErrors) + } + + if len(step.expectedRemovals) > 0 { + require.Equal(t, step.expectedRemovals, actualRemovals) + } + + // The repository record should always be left in place. Otherwise data loss go unnoticed + // when all replica records are lost. + exists, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: &gitalypb.Repository{ + StorageName: "virtual-storage", + RelativePath: "repository-1", + }, + }) + require.NoError(t, err) + require.True(t, exists.GetExists()) + + // Ensure all the metadata still contains the expected replicas + require.Equal(t, step.expectedReplicas, getAllReplicas(ctx, t, db)) + } + }) + } +} + +// getAllReplicas gets all replicas from the database except for the locked-repository which is created +// by the test suite to ensure non-blocking queries. +func getAllReplicas(ctx context.Context, t testing.TB, db glsql.Querier) map[string]map[string][]string { + rows, err := db.QueryContext(ctx, ` + SELECT repositories.virtual_storage, repositories.relative_path, storage + FROM repositories + JOIN storage_repositories USING (repository_id) + WHERE repositories.relative_path != 'locked-repository' + ORDER BY virtual_storage, relative_path, storage + `) + require.NoError(t, err) + defer rows.Close() + + results := map[string]map[string][]string{} + for rows.Next() { + var virtualStorage, relativePath, storage string + require.NoError(t, rows.Scan(&virtualStorage, &relativePath, &storage)) + + if results[virtualStorage] == nil { + results[virtualStorage] = map[string][]string{} + } + + results[virtualStorage][relativePath] = append(results[virtualStorage][relativePath], storage) + } + require.NoError(t, rows.Err()) + + return results +} |