Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-04-01 16:52:21 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-13 10:51:51 +0300
commit6fce7a809a0c515209a2c5cb78d88f08a9fffe92 (patch)
tree51c8655ae9b7347b08124121fdc2c58f3761fd09
parent465af6714c19c7a7d0b38fd02b626b08d1b6f343 (diff)
Initial implementation of a metadata verifier
This commit adds an initial implementation of a metadata verifier to Praefect. Praefect stores metadata of the repositories stored on the cluster in Postgres. These metadata records may become out of sync with the disks if changes occur on the disks without going through Praefect, for example due to disk failures or manual modifications. Right now, Praefect only contains some temporary logic to clean up invalid metadata records when replication is attempted using a non-existent source repository. This was mostly put in place to stop reconciliation loops where Praefect keeps scheduling replication jobs from the non-existent repository that will never succeed. While this performs some clean up, it's not sufficient to catch cases where something happens in the background without prompting replication. The metadata verifier introduced in this commit aims to catch these issues by verifying the metadata eveynow and then in the background with the state on the disks. For now, only the existence of the replica is verified, not the actual contents by checksumming. Each replica contains a 'verified_at' timestamp in the database that tells Praefect when the metadata record was last verified. If it exceeds a configurable threshold, the replica is considered to be due for reverification. Praefect then asks the Gitaly hosting the replica whether the replica still exists. If it doesn't the invalid metadata record is deleted and the removal is logged. To avoid multiple Praefects verifying the same replica concurrently, Praefect acquires the verification lease on the replica in the database prior to verifying the existence of the repository. The scheduling is fairly simplistic at the moment with each Praefect acquiring a batch of work every two seconds. This also serves as a crude way to rate limit the background verification work rather to avoid consuming too many resources while doing it. This should be sufficient for now althoug could later be improved. Praefect leaves the repository's record in place even if all of its replicas have been lost. This ensures no data loss goes unnoticed and that the loss needs to be acknowledged by removing the repository manually. Changelog: added
-rw-r--r--internal/praefect/verifier.go277
-rw-r--r--internal/praefect/verifier_test.go591
2 files changed, 868 insertions, 0 deletions
diff --git a/internal/praefect/verifier.go b/internal/praefect/verifier.go
new file mode 100644
index 000000000..543d0d4c2
--- /dev/null
+++ b/internal/praefect/verifier.go
@@ -0,0 +1,277 @@
+package praefect
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+)
+
+// MetadataVerifier verifies the repository metadata against the actual replicas on the
+// Gitaly nodes. It queries the database for replicas that haven't been verified in a given
+// time and checks whether the Gitalys still have them. If a Gitaly doesn't have a replica,
+// the replica's metadata record is removed and the removal logged. The repository's record
+// is still left in place even if all of the replicas are lost to ensure the data loss doesn't
+// go unnoticed.
+type MetadataVerifier struct {
+ log logrus.FieldLogger
+ db glsql.Querier
+ conns Connections
+ batchSize int
+ leaseDuration time.Duration
+ healthChecker HealthChecker
+ verificationInterval time.Duration
+}
+
+// NewMetadataVerifier creates a new MetadataVerifier.
+func NewMetadataVerifier(
+ log logrus.FieldLogger,
+ db glsql.Querier,
+ conns Connections,
+ healthChecker HealthChecker,
+ verificationInterval time.Duration,
+) *MetadataVerifier {
+ return &MetadataVerifier{
+ log: log,
+ db: db,
+ conns: conns,
+ batchSize: 25,
+ leaseDuration: 30 * time.Second,
+ healthChecker: healthChecker,
+ verificationInterval: verificationInterval,
+ }
+}
+
+type verificationJob struct {
+ repositoryID int64
+ virtualStorage string
+ relativePath string
+ storage string
+ replicaPath string
+}
+
+type verificationResult struct {
+ job verificationJob
+ exists bool
+ error error
+}
+
+// Run runs the metadata verifier. It keeps running until the context is canceled.
+func (v *MetadataVerifier) Run(ctx context.Context, ticker helper.Ticker) error {
+ defer ticker.Stop()
+
+ for {
+ ticker.Reset()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C():
+ if err := v.run(ctx); err != nil {
+ v.log.WithError(err).Error("failed a background verification run")
+ }
+ }
+ }
+}
+
+func (v *MetadataVerifier) run(ctx context.Context) error {
+ ctx, cancel := context.WithTimeout(ctx, v.leaseDuration)
+ defer cancel()
+
+ jobs, err := v.pickJobs(ctx)
+ if err != nil {
+ return fmt.Errorf("pick jobs: %w", err)
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(len(jobs))
+ results := make([]verificationResult, len(jobs))
+ for i, job := range jobs {
+ i, job := i, job
+ go func() {
+ defer wg.Done()
+
+ exists, err := v.verify(ctx, jobs[i])
+ results[i] = verificationResult{
+ job: job,
+ exists: exists,
+ error: err,
+ }
+ }()
+ }
+
+ wg.Wait()
+
+ return v.updateMetadata(ctx, results)
+}
+
+// logRecord is a helper type for gathering the removed replicas and logging them.
+type logRecord map[string]map[string][]string
+
+// markRemoved marks the given replica as removed.
+func (r logRecord) markRemoved(virtualStorage, relativePath, storage string) {
+ relativePaths, ok := r[virtualStorage]
+ if !ok {
+ relativePaths = map[string][]string{}
+ }
+
+ relativePaths[relativePath] = append(relativePaths[relativePath], storage)
+ r[virtualStorage] = relativePaths
+ sort.Strings(relativePaths[relativePath])
+}
+
+func (v *MetadataVerifier) updateMetadata(ctx context.Context, results []verificationResult) error {
+ repositoryIDs := make([]int64, len(results))
+ storages := make([]string, len(results))
+ successfullyVerifieds := make([]bool, len(results))
+ exists := make([]bool, len(results))
+
+ logRecords := logRecord{}
+ for i, result := range results {
+ repositoryIDs[i] = result.job.repositoryID
+ storages[i] = result.job.storage
+ exists[i] = result.exists
+ successfullyVerifieds[i] = result.error == nil
+
+ if result.error != nil {
+ v.log.WithFields(logrus.Fields{
+ "repository_id": result.job.repositoryID,
+ "replica_path": result.job.replicaPath,
+ "virtual_storage": result.job.virtualStorage,
+ "storage": result.job.storage,
+ "relative_path": result.job.relativePath,
+ logrus.ErrorKey: result.error,
+ }).Error("failed to verify replica's existence")
+ } else if !result.exists {
+ logRecords.markRemoved(result.job.virtualStorage, result.job.relativePath, result.job.storage)
+ }
+ }
+
+ if len(logRecords) > 0 {
+ v.log.WithField("replicas", logRecords).Info("removing metadata records of non-existent replicas")
+ }
+
+ _, err := v.db.ExecContext(ctx, `
+WITH results AS (
+ SELECT repository_id, storage, successfully_verified, exists
+ FROM (
+ SELECT unnest($1::bigint[]) AS repository_id,
+ unnest($2::text[]) AS storage,
+ unnest($3::bool[]) as successfully_verified,
+ unnest($4::bool[]) AS exists
+ ) AS results
+ JOIN (
+ SELECT repository_id
+ FROM repositories
+ WHERE repository_id = ANY($1::bigint[])
+ FOR UPDATE
+ ) AS lock_repositories USING (repository_id)
+),
+
+release_leases AS (
+ UPDATE storage_repositories
+ SET verification_leased_until = NULL,
+ verified_at = CASE WHEN successfully_verified THEN now() ELSE verified_at END
+ FROM results
+ WHERE storage_repositories.repository_id = results.repository_id
+ AND storage_repositories.storage = results.storage
+)
+
+DELETE FROM storage_repositories
+USING results
+WHERE storage_repositories.repository_id = results.repository_id
+AND storage_repositories.storage = results.storage
+AND successfully_verified AND NOT exists
+ `, repositoryIDs, storages, successfullyVerifieds, exists)
+ if err != nil {
+ return fmt.Errorf("query: %w", err)
+ }
+
+ return nil
+}
+
+func (v *MetadataVerifier) pickJobs(ctx context.Context) ([]verificationJob, error) {
+ var healthyVirtualStorages, healthyStorages []string
+ for virtualStorage, storages := range v.healthChecker.HealthyNodes() {
+ for _, storage := range storages {
+ healthyVirtualStorages = append(healthyVirtualStorages, virtualStorage)
+ healthyStorages = append(healthyStorages, storage)
+ }
+ }
+
+ rows, err := v.db.QueryContext(ctx, `
+WITH to_verify AS (
+ SELECT repository_id, relative_path, replica_path, virtual_storage, storage
+ FROM (
+ SELECT repository_id, storage
+ FROM storage_repositories
+ WHERE ( verified_at IS NULL OR verified_at < now() - $1 * '1 millisecond'::interval )
+ AND verification_leased_until IS NULL
+ ORDER BY verified_at NULLS FIRST
+ FOR NO KEY UPDATE SKIP LOCKED
+ ) AS need_verification
+ JOIN repositories USING (repository_id)
+ JOIN (
+ SELECT unnest($4::text[]) AS virtual_storage,
+ unnest($5::text[]) AS storage
+ ) AS healthy_storages USING (virtual_storage, storage)
+ LIMIT $2
+),
+
+acquire_leases AS (
+ UPDATE storage_repositories
+ SET verification_leased_until = now() + $3 * '1 millisecond'::interval
+ FROM to_verify
+ WHERE storage_repositories.repository_id = to_verify.repository_id
+ AND storage_repositories.storage = to_verify.storage
+)
+
+SELECT repository_id, replica_path, virtual_storage, relative_path, storage
+FROM to_verify
+ `, v.verificationInterval.Milliseconds(), v.batchSize, v.leaseDuration.Milliseconds(), healthyVirtualStorages, healthyStorages)
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+ defer rows.Close()
+
+ var jobs []verificationJob
+ for rows.Next() {
+ var job verificationJob
+ if err := rows.Scan(&job.repositoryID, &job.replicaPath, &job.virtualStorage, &job.relativePath, &job.storage); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ jobs = append(jobs, job)
+ }
+
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("rows: %w", err)
+ }
+
+ return jobs, nil
+}
+
+func (v *MetadataVerifier) verify(ctx context.Context, job verificationJob) (bool, error) {
+ conn, ok := v.conns[job.virtualStorage][job.storage]
+ if !ok {
+ return false, fmt.Errorf("no connection to %q/%q", job.virtualStorage, job.storage)
+ }
+
+ resp, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: job.storage,
+ RelativePath: job.replicaPath,
+ },
+ })
+ if err != nil {
+ return false, err
+ }
+
+ return resp.Exists, nil
+}
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
new file mode 100644
index 000000000..f6d1f8187
--- /dev/null
+++ b/internal/praefect/verifier_test.go
@@ -0,0 +1,591 @@
+package praefect
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ gitalyconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+type erroringRepositoryService struct {
+ gitalypb.RepositoryServiceServer
+}
+
+func (svc erroringRepositoryService) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) {
+ return nil, errors.New("erroring repository exists")
+}
+
+func TestVerifier(t *testing.T) {
+ t.Parallel()
+
+ // replicas contains the replicas the test setup should create, keyed by
+ // virtual storage -> relative path -> storage -> exists.
+ type replicas map[string]map[string]map[string]struct {
+ // exists determines whether the replica exists on the gitaly or not. If false,
+ // the replica is deleted but the metadata record left in place.
+ exists bool
+ // lastVerified is duration that has passed since the last verification.
+ lastVerified time.Duration
+ // isLeased determines whether the replica has a lease acquired during the test.
+ isLeased bool
+ }
+
+ const (
+ neverVerified = 0
+ recentlyVerified = time.Millisecond
+ pendingVerification = 30 * 24 * time.Hour
+ )
+
+ // these are the gitalys setup by the test setup
+ const (
+ gitaly1 = "gitaly-0"
+ gitaly2 = "gitaly-1"
+ gitaly3 = "gitaly-2"
+ )
+
+ type step struct {
+ expectedRemovals logRecord
+ expectedErrors map[string]map[string][]string
+ healthyStorages StaticHealthChecker
+ expectedReplicas map[string]map[string][]string
+ }
+
+ for _, tc := range []struct {
+ desc string
+ erroringGitalys map[string]bool
+ replicas replicas
+ batchSize int
+ steps []step
+ }{
+ {
+ desc: "all replicas exist",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: true, lastVerified: recentlyVerified},
+ gitaly3: {exists: true, lastVerified: pendingVerification},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "recently verified replicas are not picked",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: false, lastVerified: recentlyVerified},
+ gitaly2: {exists: false, lastVerified: recentlyVerified},
+ gitaly3: {exists: false, lastVerified: recentlyVerified},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "replicas on unhealthy storages are not picked",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: true, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: neverVerified},
+ },
+ },
+ },
+ steps: []step{
+ {
+ healthyStorages: StaticHealthChecker{"virtual-storage": {gitaly1, gitaly2}},
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "metadata not deleted for replicas which errored on verification",
+ erroringGitalys: map[string]bool{
+ gitaly3: true,
+ },
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: true, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: neverVerified},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedErrors: map[string]map[string][]string{
+ "virtual-storage": {"repository-1": {gitaly3}},
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "replicas with leases acquired are not picked",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified, isLeased: true},
+ gitaly2: {exists: false, lastVerified: neverVerified, isLeased: true},
+ gitaly3: {exists: false, lastVerified: pendingVerification, isLeased: true},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "replicas missing have their metadata records removed",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: false, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: pendingVerification},
+ },
+ },
+ },
+ steps: []step{
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly2, gitaly3},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "verification time is updated when repository exists",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ gitaly1: {exists: true, lastVerified: neverVerified},
+ gitaly2: {exists: false, lastVerified: neverVerified},
+ gitaly3: {exists: false, lastVerified: pendingVerification},
+ },
+ },
+ },
+ batchSize: 1,
+ steps: []step{
+ {
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly2, gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly2},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1, gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly3},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly1},
+ },
+ },
+ },
+ },
+ },
+ {
+ desc: "all replicas are lost",
+ replicas: replicas{
+ "virtual-storage": {
+ "repository-1": {
+ // The verification should be prioritized never verified repos
+ // are first and repos that were verified a longer time ago after them.
+ gitaly1: {exists: false, lastVerified: neverVerified},
+ gitaly2: {exists: false, lastVerified: pendingVerification + time.Hour},
+ gitaly3: {exists: false, lastVerified: pendingVerification},
+ },
+ },
+ },
+ batchSize: 1,
+ steps: []step{
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly1},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly2, gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly2},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{
+ "virtual-storage": {
+ "repository-1": {gitaly3},
+ },
+ },
+ },
+ {
+ expectedRemovals: logRecord{
+ "virtual-storage": {
+ "repository-1": {gitaly3},
+ },
+ },
+ expectedReplicas: map[string]map[string][]string{},
+ },
+ },
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := testhelper.Context(t)
+
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {Name: "virtual-storage"},
+ },
+ Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository},
+ }
+
+ for i := 0; i < 3; i++ {
+ storageName := fmt.Sprintf("gitaly-%d", i)
+
+ registerFunc := setup.RegisterAll
+ if tc.erroringGitalys[storageName] {
+ registerFunc = func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterRepositoryServiceServer(srv, erroringRepositoryService{repository.NewServer(
+ deps.GetCfg(),
+ deps.GetRubyServer(),
+ deps.GetLocator(),
+ deps.GetTxManager(),
+ deps.GetGitCmdFactory(),
+ deps.GetCatfileCache(),
+ deps.GetConnsPool(),
+ deps.GetGit2goExecutor(),
+ deps.GetHousekeepingManager(),
+ )})
+ }
+ }
+
+ cfg := testcfg.Build(t, testcfg.WithStorages(storageName))
+ cfg.SocketPath = testserver.RunGitalyServer(t, cfg, nil, registerFunc, testserver.WithDisablePraefect())
+ conf.VirtualStorages[0].Nodes = append(conf.VirtualStorages[0].Nodes, &config.Node{
+ Storage: storageName,
+ Address: cfg.SocketPath,
+ })
+ }
+
+ db := testdb.New(t)
+ discardLogger := testhelper.NewDiscardingLogEntry(t)
+ sidechannelRegistry := sidechannel.NewRegistry()
+ txManager := transactions.NewManager(config.Config{})
+ nodeSet, err := DialNodes(
+ ctx,
+ conf.VirtualStorages,
+ protoregistry.GitalyProtoPreregistered,
+ nil,
+ backchannel.NewClientHandshaker(
+ discardLogger,
+ NewBackchannelServerFactory(
+ discardLogger,
+ transaction.NewServer(txManager),
+ sidechannelRegistry,
+ ),
+ ),
+ sidechannelRegistry,
+ )
+ require.NoError(t, err)
+ t.Cleanup(nodeSet.Close)
+
+ tx := db.Begin(t)
+ t.Cleanup(func() { tx.Rollback(t) })
+ testdb.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{
+ "praefect-0": conf.StorageNames(),
+ })
+ elector := nodes.NewPerRepositoryElector(tx)
+ conns := nodeSet.Connections()
+ rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
+
+ conn, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
+ withRouter: NewPerRepositoryRouter(
+ conns,
+ elector,
+ StaticHealthChecker(conf.StorageNames()),
+ NewLockedRandom(rand.New(rand.NewSource(0))),
+ rs,
+ datastore.NewAssignmentStore(db, conf.StorageNames()),
+ rs,
+ conf.DefaultReplicationFactors(),
+ ),
+ withRepoStore: rs,
+ withTxMgr: txManager,
+ })
+ t.Cleanup(cleanup)
+
+ // Set up the test repositories.
+ for virtualStorage, relativePaths := range tc.replicas {
+ for relativePath, storages := range relativePaths {
+ // Create the expected repository. This creates all of the replicas transactionally.
+ gittest.CreateRepository(ctx, t,
+ gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: virtualStorage}}},
+ gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: relativePath},
+ )
+
+ // Now remove the replicas that were created in the transaction but the test case
+ // expects not to exist. We remove them directly from the Gitalys so the metadata
+ // records are left in place.
+ for storage, replica := range storages {
+ // Set the last verification time to what the test expects it to be.
+ if replica.lastVerified > 0 {
+ _, err := db.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verified_at = now() - $4 * '1 millisecond'::interval
+ FROM repositories
+ WHERE storage_repositories.repository_id = repositories.repository_id
+ AND repositories.virtual_storage = $1
+ AND repositories.relative_path = $2
+ AND storage = $3`,
+ virtualStorage, relativePath, storage, replica.lastVerified.Milliseconds())
+ require.NoError(t, err)
+ }
+
+ // Set a lease if the test expects the record to be leased.
+ if replica.isLeased {
+ _, err := db.ExecContext(ctx, `
+ UPDATE storage_repositories
+ SET verification_leased_until = now()
+ FROM repositories
+ WHERE storage_repositories.repository_id = repositories.repository_id
+ AND repositories.virtual_storage = $1
+ AND repositories.relative_path = $2
+ AND storage = $3`,
+ virtualStorage, relativePath, storage)
+ require.NoError(t, err)
+ }
+
+ if replica.exists {
+ continue
+ }
+
+ _, err := gitalypb.NewRepositoryServiceClient(conns[virtualStorage][storage]).RemoveRepository(ctx,
+ &gitalypb.RemoveRepositoryRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: storage,
+ RelativePath: relativePath,
+ },
+ },
+ )
+ require.NoError(t, err)
+ }
+ }
+ }
+
+ // Create a repository and lock its records to assert the dequeuer does not wait on row locks.
+ gittest.CreateRepository(ctx, t,
+ gitalyconfig.Cfg{Storages: []gitalyconfig.Storage{{Name: "virtual-storage"}}},
+ gittest.CreateRepositoryConfig{ClientConn: conn, RelativePath: "locked-repository"},
+ )
+
+ rowLockTx := db.Begin(t)
+ defer rowLockTx.Rollback(t)
+
+ var lockedRows int
+ require.NoError(t, rowLockTx.QueryRowContext(ctx, `
+ WITH locked_repository AS (
+ SELECT repository_id
+ FROM repositories
+ WHERE repositories.virtual_storage = 'virtual-storage'
+ AND repositories.relative_path = 'locked-repository'
+ FOR UPDATE
+ ),
+
+ locked_replicas AS (
+ SELECT FROM storage_repositories
+ JOIN locked_repository USING (repository_id)
+ FOR UPDATE
+ )
+
+ SELECT count(*) FROM locked_replicas`,
+ ).Scan(&lockedRows))
+ require.Equal(t, 3, lockedRows)
+
+ for _, step := range tc.steps {
+ logger, hook := test.NewNullLogger()
+
+ healthyStorages := StaticHealthChecker{"virtual-storage": []string{gitaly1, gitaly2, gitaly3}}
+ if step.healthyStorages != nil {
+ healthyStorages = step.healthyStorages
+ }
+
+ verifier := NewMetadataVerifier(logger, db, conns, healthyStorages, 24*7*time.Hour)
+ if tc.batchSize > 0 {
+ verifier.batchSize = tc.batchSize
+ }
+
+ runCtx, cancelRun := context.WithCancel(ctx)
+ err = verifier.Run(runCtx, helper.NewCountTicker(1, cancelRun))
+ require.Equal(t, context.Canceled, err)
+
+ // Ensure the removals and errors are correctly logged
+ var actualRemovals logRecord
+ actualErrors := map[string]map[string][]string{}
+ for _, entry := range hook.Entries {
+ switch entry.Message {
+ case "removing metadata records of non-existent replicas":
+ if len(step.expectedRemovals) == 0 {
+ t.Fatalf("unexpected removals logged")
+ }
+
+ actualRemovals = entry.Data["replicas"].(logRecord)
+ case "failed to verify replica's existence":
+ if len(step.expectedErrors) == 0 {
+ t.Fatalf("unexpected errors logged")
+ }
+
+ virtualStorage := entry.Data["virtual_storage"].(string)
+ relativePath := entry.Data["relative_path"].(string)
+
+ if actualErrors[virtualStorage] == nil {
+ actualErrors[virtualStorage] = map[string][]string{}
+ }
+
+ actualErrors[virtualStorage][relativePath] = append(actualErrors[virtualStorage][relativePath], entry.Data["storage"].(string))
+ default:
+ t.Fatalf("unexpected log message")
+ }
+ }
+
+ if len(step.expectedErrors) > 0 {
+ require.Equal(t, step.expectedErrors, actualErrors)
+ }
+
+ if len(step.expectedRemovals) > 0 {
+ require.Equal(t, step.expectedRemovals, actualRemovals)
+ }
+
+ // The repository record should always be left in place. Otherwise data loss go unnoticed
+ // when all replica records are lost.
+ exists, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: &gitalypb.Repository{
+ StorageName: "virtual-storage",
+ RelativePath: "repository-1",
+ },
+ })
+ require.NoError(t, err)
+ require.True(t, exists.GetExists())
+
+ // Ensure all the metadata still contains the expected replicas
+ require.Equal(t, step.expectedReplicas, getAllReplicas(ctx, t, db))
+ }
+ })
+ }
+}
+
+// getAllReplicas gets all replicas from the database except for the locked-repository which is created
+// by the test suite to ensure non-blocking queries.
+func getAllReplicas(ctx context.Context, t testing.TB, db glsql.Querier) map[string]map[string][]string {
+ rows, err := db.QueryContext(ctx, `
+ SELECT repositories.virtual_storage, repositories.relative_path, storage
+ FROM repositories
+ JOIN storage_repositories USING (repository_id)
+ WHERE repositories.relative_path != 'locked-repository'
+ ORDER BY virtual_storage, relative_path, storage
+ `)
+ require.NoError(t, err)
+ defer rows.Close()
+
+ results := map[string]map[string][]string{}
+ for rows.Next() {
+ var virtualStorage, relativePath, storage string
+ require.NoError(t, rows.Scan(&virtualStorage, &relativePath, &storage))
+
+ if results[virtualStorage] == nil {
+ results[virtualStorage] = map[string][]string{}
+ }
+
+ results[virtualStorage][relativePath] = append(results[virtualStorage][relativePath], storage)
+ }
+ require.NoError(t, rows.Err())
+
+ return results
+}