diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-03-27 07:25:52 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-03-27 07:25:52 +0300 |
commit | a98b52ec08cffd61cd41bcc53761d54ef360ecee (patch) | |
tree | 914cad6d1e7fc7aa0d25ddc9d6e6c4ba448ba3e3 | |
parent | 5cad7eb778160c81aa9a8755ec991030266bb4d5 (diff) |
Praefect: replication event queue as a primary storage of events
Replication storage interface switched to `ReplicationEventQueue`.
`gitaly_replication_queue` table extended with `meta` column
introduced as a container for meta information such as correlation
ID, etc.
`memoryReplicationEventQueue` now populates `LockID` field to
produce same result as SQL impl.
`ReplicationEventQueueInterceptor` introduced for testing purposes
as well as an interceptor for metrics, etc.
`slice` package created to assemble common operation on different
kind of slices (`Uint64` is first one).
Part of: https://gitlab.com/gitlab-org/gitaly/-/issues/2166
21 files changed, 666 insertions, 761 deletions
diff --git a/changelogs/unreleased/ps-queued-datastore.yml b/changelogs/unreleased/ps-queued-datastore.yml new file mode 100644 index 000000000..208414106 --- /dev/null +++ b/changelogs/unreleased/ps-queued-datastore.yml @@ -0,0 +1,5 @@ +--- +title: 'Praefect: replication event queue as a primary storage of events' +merge_request: 1948 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 194bb7fdf..2d98b05cd 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -183,7 +183,10 @@ func run(cfgs []starter.Config, conf config.Config) error { var ( // top level server dependencies - ds = datastore.NewInMemory(conf) + ds = datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + } coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, registry) repl = praefect.NewReplMgr( conf.VirtualStorages[0].Name, diff --git a/internal/middleware/metadatahandler/metadatahandler.go b/internal/middleware/metadatahandler/metadatahandler.go index 0581e82da..c8c2aaa9e 100644 --- a/internal/middleware/metadatahandler/metadatahandler.go +++ b/internal/middleware/metadatahandler/metadatahandler.go @@ -48,7 +48,8 @@ const AuthVersionKey = "grpc.meta.auth_version" // DeadlineTypeKey is the key used in ctx_tags to store the deadline type const DeadlineTypeKey = "grpc.meta.deadline_type" -const correlationIDKey = "correlation_id" +// CorrelationIDKey is the key used in ctx_tags to store the correlation ID +const CorrelationIDKey = "correlation_id" // Unknown client and feature. Matches the prometheus grpc unknown value const unknownValue = "unknown" @@ -112,7 +113,7 @@ func addMetadataTags(ctx context.Context) metadataTags { // This is a stop-gap approach to logging correlation_ids correlationID := correlation.ExtractFromContext(ctx) if correlationID != "" { - tags.Set(correlationIDKey, correlationID) + tags.Set(CorrelationIDKey, correlationID) } return metaTags diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 0229590a0..3831b60f8 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -189,7 +189,10 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func } logEntry := testhelper.DiscardTestEntry(t) - ds := datastore.NewInMemory(conf) + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + } nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec()) require.NoError(t, err) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 7295538b4..85e53345d 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -7,6 +7,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" @@ -96,9 +97,7 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, mi prot return nil, err } - if requestFinalizer, err = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change, params); err != nil { - return nil, err - } + requestFinalizer = c.createReplicaJobs(ctx, targetRepo, primary, secondaries, change, params) } return proxy.NewStreamParameters(ctx, primary.GetConnection(), requestFinalizer, nil), nil @@ -194,29 +193,38 @@ func (c *Coordinator) createReplicaJobs( secondaries []nodes.Node, change datastore.ChangeType, params datastore.Params, -) (func(), error) { - var secondaryStorages []string - for _, secondary := range secondaries { - secondaryStorages = append(secondaryStorages, secondary.GetStorage()) - } - - corrID := c.ensureCorrelationID(ctx, targetRepo) - - jobIDs, err := c.datastore.CreateReplicaReplJobs(corrID, targetRepo.RelativePath, primary.GetStorage(), secondaryStorages, change, params) - if err != nil { - return nil, err - } - +) func() { return func() { - for _, jobID := range jobIDs { - // TODO: in case of error the job remains in queue in 'pending' state and leads to: - // - additional memory consumption - // - stale state of one of the git data stores - if err := c.datastore.UpdateReplJobState(jobID, datastore.JobStateReady); err != nil { - c.log.WithField("job_id", jobID).WithError(err).Errorf("error when updating replication job to %q", datastore.JobStateReady) + correlationID := c.ensureCorrelationID(ctx, targetRepo) + + for _, secondary := range secondaries { + event := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: change, + RelativePath: targetRepo.GetRelativePath(), + SourceNodeStorage: primary.GetStorage(), + TargetNodeStorage: secondary.GetStorage(), + Params: params, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlationID}, } + + // TODO: it could happen that there won't be enough time to enqueue replication events + // do we need to create another ctx with another timeout? + // https://gitlab.com/gitlab-org/gitaly/-/issues/2586 + go func() { + _, err := c.datastore.Enqueue(ctx, event) + if err != nil { + c.log.WithFields(logrus.Fields{ + logWithReplSource: event.Job.SourceNodeStorage, + logWithReplTarget: event.Job.TargetNodeStorage, + logWithReplChange: event.Job.Change, + logWithReplPath: event.Job.RelativePath, + }).Error("failed to persist replication event") + } + }() } - }, nil + } } func (c *Coordinator) ensureCorrelationID(ctx context.Context, targetRepo *gitalypb.Repository) string { diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 362eb6787..9251d8010 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -1,12 +1,15 @@ package praefect import ( + "context" "io/ioutil" + "sync" "testing" "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" @@ -46,7 +49,19 @@ func TestStreamDirector(t *testing.T) { }, }, } - ds := datastore.NewInMemory(conf) + + var replEventWait sync.WaitGroup + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + defer replEventWait.Done() + return queue.Enqueue(ctx, event) + }) + + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: queueInterceptor, + } targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -95,36 +110,36 @@ func TestStreamDirector(t *testing.T) { require.NoError(t, err) require.Equal(t, "praefect-internal-1", rewrittenRepo.GetStorageName(), "stream director should have rewritten the storage name") - jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStatePending}, "praefect-internal-2", 10) - require.NoError(t, err) - require.Len(t, jobs, 1) + replEventWait.Add(1) // expected only one event to be created + // this call creates new events in the queue and simulates usual flow of the update operation + streamParams.RequestFinalizer() targetNode, err := ds.GetStorageNode("praefect-internal-2") require.NoError(t, err) sourceNode, err := ds.GetStorageNode("praefect-internal-1") - require.NoError(t, err) - expectedJob := datastore.ReplJob{ - Change: datastore.UpdateRepo, - ID: 1, - TargetNode: targetNode, - SourceNode: sourceNode, - State: datastore.JobStatePending, - RelativePath: targetRepo.RelativePath, - CorrelationID: "my-correlation-id", - } - - require.Equal(t, expectedJob, jobs[0], "ensure replication job created by stream director is correct") - - streamParams.RequestFinalizer() - - jobs, err = coordinator.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady}, "praefect-internal-2", 10) + replEventWait.Wait() // wait until event persisted (async operation) + events, err := ds.ReplicationEventQueue.Dequeue(ctx, "praefect-internal-2", 10) require.NoError(t, err) - require.Len(t, jobs, 1) - - expectedJob.State = datastore.JobStateReady - require.Equal(t, expectedJob, jobs[0], "ensure replication job's status has been updatd to JobStateReady") + require.Len(t, events, 1) + + expectedEvent := datastore.ReplicationEvent{ + ID: 1, + State: datastore.JobStateInProgress, + Attempt: 2, + LockID: "praefect-internal-2|/path/to/hashed/storage", + CreatedAt: events[0].CreatedAt, + UpdatedAt: events[0].UpdatedAt, + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + RelativePath: targetRepo.RelativePath, + TargetNodeStorage: targetNode.Storage, + SourceNodeStorage: sourceNode.Storage, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, + } + require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct") } type mockPeeker struct { @@ -159,7 +174,19 @@ func TestAbsentCorrelationID(t *testing.T) { }, }, } - ds := datastore.NewInMemory(conf) + + var replEventWait sync.WaitGroup + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { + defer replEventWait.Done() + return queue.Enqueue(ctx, event) + }) + + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: queueInterceptor, + } targetRepo := gitalypb.Repository{ StorageName: "praefect", @@ -191,10 +218,15 @@ func TestAbsentCorrelationID(t *testing.T) { require.NoError(t, err) require.Equal(t, address, streamParams.Conn().Target()) - jobs, err := coordinator.datastore.GetJobs([]datastore.JobState{datastore.JobStatePending}, conf.VirtualStorages[0].Nodes[1].Storage, 1) + replEventWait.Add(1) // expected only one event to be created + // must be run as it adds replication events to the queue + streamParams.RequestFinalizer() + + replEventWait.Wait() // wait until event persisted (async operation) + jobs, err := coordinator.datastore.Dequeue(ctx, conf.VirtualStorages[0].Nodes[1].Storage, 1) require.NoError(t, err) require.Len(t, jobs, 1) - require.NotZero(t, jobs[0].CorrelationID, + require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey], "the coordinator should have generated a random ID") } diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index ae084d1ad..0b2d91c60 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -6,9 +6,10 @@ package datastore import ( + "database/sql/driver" + "encoding/json" "errors" "fmt" - "sort" "sync" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" @@ -23,9 +24,6 @@ func (js JobState) String() string { } const ( - // JobStatePending is the initial job state when it is not yet ready to run - // and may indicate recovery from a failure prior to the ready-state. - JobStatePending = JobState("pending") // JobStateReady indicates the job is now ready to proceed. JobStateReady = JobState("ready") // JobStateInProgress indicates the job is being processed by a worker. @@ -62,6 +60,25 @@ func (ct ChangeType) String() string { // It must be JSON encodable/decodable to persist it without problems. type Params map[string]interface{} +// Scan assigns a value from a database driver. +func (p *Params) Scan(value interface{}) error { + if value == nil { + return nil + } + + d, ok := value.([]byte) + if !ok { + return fmt.Errorf("unexpected type received: %T", value) + } + + return json.Unmarshal(d, p) +} + +// Value returns a driver Value. +func (p Params) Value() (driver.Value, error) { + return json.Marshal(p) +} + // ReplJob is an instance of a queued replication job. A replication job is // meant for updating the repository so that it is synced with the primary // copy. Scheduled indicates when a replication job should be performed. @@ -76,22 +93,11 @@ type ReplJob struct { CorrelationID string // from original request } -// replJobs provides sort manipulation behavior -type replJobs []ReplJob - -func (rjs replJobs) Len() int { return len(rjs) } -func (rjs replJobs) Swap(i, j int) { rjs[i], rjs[j] = rjs[j], rjs[i] } - -// byJobID provides a comparator for sorting jobs -type byJobID struct{ replJobs } - -func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].ID } - // Datastore is a data persistence abstraction for all of Praefect's // persistence needs type Datastore interface { - ReplJobsDatastore ReplicasDatastore + ReplicationEventQueue } // ReplicasDatastore manages accessing and setting which secondary replicas @@ -108,45 +114,16 @@ type ReplicasDatastore interface { GetStorageNodes() ([]models.Node, error) } -// ReplJobsDatastore represents the behavior needed for fetching and updating -// replication jobs from the datastore -type ReplJobsDatastore interface { - // GetJobs fetches a list of chronologically ordered replication - // jobs for the given storage replica. The returned list will be at most - // count-length. - GetJobs(states []JobState, nodeStorage string, count int) ([]ReplJob, error) - - // CreateReplicaReplJobs will create replication jobs for each secondary - // replica of a repository known to the datastore. A set of replication job - // ID's for the created jobs will be returned upon success. - CreateReplicaReplJobs(correlationID, relativePath, primaryStorage string, secondaryStorages []string, change ChangeType, params Params) ([]uint64, error) - - // UpdateReplJobState updates the state of an existing replication job - UpdateReplJobState(jobID uint64, newState JobState) error - - IncrReplJobAttempts(jobID uint64) error -} - -type jobRecord struct { - change ChangeType - relativePath string // project's relative path - targetNodeStorage string - sourceNodeStorage string - state JobState - attempts int - params Params - correlationID string // from original request +// MemoryQueue is an intermediate struct used for introduction of ReplicationEventQueue into usage. +type MemoryQueue struct { + *MemoryDatastore + ReplicationEventQueue } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation type MemoryDatastore struct { - jobs *struct { - sync.RWMutex - records map[uint64]jobRecord // all jobs indexed by ID - } - // storageNodes is read-only after initialization // if modification needed there must be synchronization for concurrent access to it storageNodes map[string]models.Node @@ -165,12 +142,6 @@ type MemoryDatastore struct { func NewInMemory(cfg config.Config) *MemoryDatastore { m := &MemoryDatastore{ storageNodes: map[string]models.Node{}, - jobs: &struct { - sync.RWMutex - records map[uint64]jobRecord // all jobs indexed by ID - }{ - records: map[uint64]jobRecord{}, - }, repositories: &struct { sync.RWMutex m map[string]models.Repository @@ -255,134 +226,3 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) { return storageNodes, nil } - -// GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(states []JobState, targetNodeStorage string, count int) ([]ReplJob, error) { - statesSet := make(map[JobState]bool, len(states)) - for _, state := range states { - statesSet[state] = true - } - - md.jobs.RLock() - defer md.jobs.RUnlock() - - var results []ReplJob - - for i, record := range md.jobs.records { - // state is a bitmap that is a combination of one or more JobStates - if statesSet[record.state] && record.targetNodeStorage == targetNodeStorage { - job, err := md.replJobFromRecord(i, record) - if err != nil { - return nil, err - } - - results = append(results, job) - if len(results) >= count { - break - } - } - } - - sort.Sort(byJobID{results}) - - return results, nil -} - -// replJobFromRecord constructs a replication job from a record and by cross -// referencing the current repository for the project being replicated -func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { - sourceNode, err := md.GetStorageNode(record.sourceNodeStorage) - if err != nil { - return ReplJob{}, err - } - targetNode, err := md.GetStorageNode(record.targetNodeStorage) - if err != nil { - return ReplJob{}, err - } - - return ReplJob{ - Change: record.change, - ID: jobID, - RelativePath: record.relativePath, - SourceNode: sourceNode, - State: record.state, - TargetNode: targetNode, - Attempts: record.attempts, - Params: record.params, - CorrelationID: record.correlationID, - }, nil -} - -// CreateReplicaReplJobs creates a replication job for each secondary that -// backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateReplicaReplJobs( - correlationID string, - relativePath, - primaryStorage string, - secondaryStorages []string, - change ChangeType, - params Params, -) ([]uint64, error) { - md.jobs.Lock() - defer md.jobs.Unlock() - - if relativePath == "" { - return nil, errors.New("invalid source repository") - } - - var jobIDs []uint64 - - for _, secondaryStorage := range secondaryStorages { - nextID := uint64(len(md.jobs.records) + 1) - - md.jobs.records[nextID] = jobRecord{ - change: change, - targetNodeStorage: secondaryStorage, - state: JobStatePending, - relativePath: relativePath, - sourceNodeStorage: primaryStorage, - params: params, - correlationID: correlationID, - } - - jobIDs = append(jobIDs, nextID) - } - - return jobIDs, nil -} - -// UpdateReplJobState updates an existing replication job's state -func (md *MemoryDatastore) UpdateReplJobState(jobID uint64, newState JobState) error { - md.jobs.Lock() - defer md.jobs.Unlock() - - job, ok := md.jobs.records[jobID] - if !ok { - return fmt.Errorf("job ID %d does not exist", jobID) - } - - if newState == JobStateCompleted || newState == JobStateCancelled { - // remove the job to avoid filling up memory with unneeded job records - delete(md.jobs.records, jobID) - return nil - } - - job.state = newState - md.jobs.records[jobID] = job - return nil -} - -// IncrReplJobAttempts updates an existing replication job's state -func (md *MemoryDatastore) IncrReplJobAttempts(jobID uint64) error { - md.jobs.Lock() - defer md.jobs.Unlock() - - job, ok := md.jobs.records[jobID] - if !ok { - return fmt.Errorf("job ID %d does not exist", jobID) - } - - job.attempts++ - md.jobs.records[jobID] = job - return nil -} diff --git a/internal/praefect/datastore/datastore_test.go b/internal/praefect/datastore/datastore_test.go deleted file mode 100644 index b3311d970..000000000 --- a/internal/praefect/datastore/datastore_test.go +++ /dev/null @@ -1,146 +0,0 @@ -// +build !postgres - -package datastore - -import ( - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" -) - -var ( - stor1 = models.Node{ - Address: "tcp://address-1", - Storage: "praefect-storage-1", - DefaultPrimary: true, - } - stor2 = models.Node{ - Address: "tcp://address-2", - Storage: "praefect-storage-2", - } - proj1 = "abcd1234" // imagine this is a legit project hash - correlationID = "my-correlation-id" -) - -var ( - repo1Repository = models.Repository{ - RelativePath: proj1, - } -) - -var operations = []struct { - desc string - opFn func(*testing.T, Datastore) -}{ - { - desc: "query an empty datastore", - opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs([]JobState{JobStatePending, JobStateReady}, stor1.Storage, 1) - require.NoError(t, err) - require.Len(t, jobs, 0) - }, - }, - { - desc: "insert replication job for Update", - opFn: func(t *testing.T, ds Datastore) { - _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, UpdateRepo, nil) - require.NoError(t, err) - }, - }, - { - desc: "insert replication job for Rename", - opFn: func(t *testing.T, ds Datastore) { - _, err := ds.CreateReplicaReplJobs(correlationID, repo1Repository.RelativePath, stor1.Storage, []string{stor2.Storage}, RenameRepo, Params{"RelativePath": "/data/dir/repo"}) - require.NoError(t, err) - }, - }, - { - desc: "fetch inserted replication jobs", - opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 10) - require.NoError(t, err) - require.Len(t, jobs, 2) - - expectedJobs := []ReplJob{ - { - Change: UpdateRepo, - ID: 1, - RelativePath: repo1Repository.RelativePath, - SourceNode: stor1, - TargetNode: stor2, - State: JobStatePending, - Params: nil, - CorrelationID: correlationID, - }, - { - Change: RenameRepo, - ID: 2, - RelativePath: repo1Repository.RelativePath, - SourceNode: stor1, - TargetNode: stor2, - State: JobStatePending, - Params: Params{"RelativePath": "/data/dir/repo"}, - CorrelationID: correlationID, - }, - } - require.ElementsMatch(t, expectedJobs, jobs) - }, - }, - { - desc: "mark Update replication job as done", - opFn: func(t *testing.T, ds Datastore) { - err := ds.UpdateReplJobState(1, JobStateCompleted) - require.NoError(t, err) - }, - }, - { - desc: "try fetching pending replication jobs", - opFn: func(t *testing.T, ds Datastore) { - jobs, err := ds.GetJobs([]JobState{JobStatePending}, stor2.Storage, 1) - require.NoError(t, err) - require.Len(t, jobs, 1) - - completed := ReplJob{ - Change: RenameRepo, - ID: 2, - RelativePath: repo1Repository.RelativePath, - SourceNode: stor1, - TargetNode: stor2, - State: JobStatePending, - Params: Params{"RelativePath": "/data/dir/repo"}, - CorrelationID: correlationID, - } - require.Equal(t, completed, jobs[0]) - }, - }, -} - -// TODO: add SQL datastore flavor -var flavors = map[string]func() Datastore{ - "in-memory-datastore": func() Datastore { - return NewInMemory(config.Config{ - VirtualStorages: []*config.VirtualStorage{ - &config.VirtualStorage{ - Nodes: []*models.Node{&stor1, &stor2}, - }, - }, - }) - }, -} - -// TestDatastoreInterface will verify that every implementation or "flavor" of -// datastore interface (in-Memory or SQL) behaves consistently given the same -// series of operations -func TestDatastoreInterface(t *testing.T) { - for name, dsFactory := range flavors { - t.Run(name, func(t *testing.T) { - ds := dsFactory() - for i, op := range operations { - t.Logf("operation %d: %s", i+1, op.desc) - op.opFn(t, ds) - } - }) - } -} diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index 455282af6..ee837e86a 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -216,7 +216,7 @@ func (p *Uint64Provider) Values() []uint64 { // To returns a list of pointers that will be used as a destination for scan operation. func (p *Uint64Provider) To() []interface{} { - var d = new(uint64) - *p = append(*p, d) - return []interface{}{d} + var d uint64 + *p = append(*p, &d) + return []interface{}{&d} } diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index eceddf891..8bff18850 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -51,9 +51,9 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { func (db DB) TruncateAll(t testing.TB) { db.Truncate(t, - "gitaly_replication_queue_job_lock", - "gitaly_replication_queue", - "gitaly_replication_queue_lock", + "replication_queue_job_lock", + "replication_queue", + "replication_queue_lock", ) } @@ -67,7 +67,7 @@ func (db DB) Close() error { // GetDB returns a wrapper around the database connection pool. // Must be used only for testing. -// The new database 'gitaly_test' will be re-created for each package that uses this function. +// The new `database` will be re-created for each package that uses this function. // Each call will also truncate all tables with their identities restarted if any. // The best place to call it is in individual testing functions. // It uses env vars: @@ -121,7 +121,7 @@ func initGitalyTestDB(t testing.TB, database string) *sql.DB { require.NoError(t, rows.Close()) _, dErr := postgresDB.Exec("DROP DATABASE IF EXISTS " + database) - require.NoError(t, dErr, "failed to drop 'gitaly_test' database") + require.NoErrorf(t, dErr, "failed to drop %q database", database) _, cErr := postgresDB.Exec("CREATE DATABASE " + database + " WITH ENCODING 'UTF8'") require.NoErrorf(t, cErr, "failed to create %q database", database) diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index e5408e6f0..4f6f72938 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -32,6 +32,8 @@ func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event Replicati event.State = JobStateReady event.CreatedAt = time.Now().UTC() // event.LockID is unnecessary with an in memory data store as it is intended to synchronize multiple praefect instances + // but must be filled out to produce same event as it done by SQL implementation + event.LockID = event.Job.TargetNodeStorage + "|" + event.Job.RelativePath s.Lock() defer s.Unlock() @@ -106,7 +108,7 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt result = append(result, id) switch state { - case JobStateCompleted: + case JobStateCompleted, JobStateCancelled, JobStateDead: // this event is fully processed and could be removed s.remove(i) case JobStateFailed: @@ -114,9 +116,6 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt // out of luck for this replication event, remove from queue as no more attempts available s.remove(i) } - case JobStateCancelled: - // out of luck for this replication event, remove from queue as no more attempts available - s.remove(i) } break } @@ -131,3 +130,60 @@ func (s *memoryReplicationEventQueue) remove(i int) { delete(s.dequeued, s.queued[i].ID) s.queued = append(s.queued[:i], s.queued[i+1:]...) } + +// ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface. +type ReplicationEventQueueInterceptor interface { + // ReplicationEventQueue actual implementation. + ReplicationEventQueue + // OnEnqueue allows to set action that would be executed each time when `Enqueue` method called. + OnEnqueue(func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) + // OnDequeue allows to set action that would be executed each time when `Dequeue` method called. + OnDequeue(func(context.Context, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) + // OnAcknowledge allows to set action that would be executed each time when `Acknowledge` method called. + OnAcknowledge(func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) +} + +// NewReplicationEventQueueInterceptor returns interception over `ReplicationEventQueue` interface. +func NewReplicationEventQueueInterceptor(queue ReplicationEventQueue) ReplicationEventQueueInterceptor { + return &replicationEventQueueInterceptor{ReplicationEventQueue: queue} +} + +type replicationEventQueueInterceptor struct { + ReplicationEventQueue + onEnqueue func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error) + onDequeue func(context.Context, string, int, ReplicationEventQueue) ([]ReplicationEvent, error) + onAcknowledge func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error) +} + +func (i *replicationEventQueueInterceptor) OnEnqueue(action func(context.Context, ReplicationEvent, ReplicationEventQueue) (ReplicationEvent, error)) { + i.onEnqueue = action +} + +func (i *replicationEventQueueInterceptor) OnDequeue(action func(context.Context, string, int, ReplicationEventQueue) ([]ReplicationEvent, error)) { + i.onDequeue = action +} + +func (i *replicationEventQueueInterceptor) OnAcknowledge(action func(context.Context, JobState, []uint64, ReplicationEventQueue) ([]uint64, error)) { + i.onAcknowledge = action +} + +func (i *replicationEventQueueInterceptor) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { + if i.onEnqueue != nil { + return i.onEnqueue(ctx, event, i.ReplicationEventQueue) + } + return i.ReplicationEventQueue.Enqueue(ctx, event) +} + +func (i *replicationEventQueueInterceptor) Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) { + if i.onDequeue != nil { + return i.onDequeue(ctx, nodeStorage, count, i.ReplicationEventQueue) + } + return i.ReplicationEventQueue.Dequeue(ctx, nodeStorage, count) +} + +func (i *replicationEventQueueInterceptor) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { + if i.onAcknowledge != nil { + return i.onAcknowledge(ctx, state, ids, i.ReplicationEventQueue) + } + return i.ReplicationEventQueue.Acknowledge(ctx, state, ids) +} diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go index c55f9df50..1f33ff200 100644 --- a/internal/praefect/datastore/memory_test.go +++ b/internal/praefect/datastore/memory_test.go @@ -39,7 +39,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) { ID: 1, State: JobStateReady, Attempt: 3, - LockID: "", + LockID: "storage-1|/project/path-1", CreatedAt: event1.CreatedAt, // it is a hack to have same time for both Job: job1, } @@ -65,7 +65,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) { ID: 2, State: JobStateReady, Attempt: 3, - LockID: "", + LockID: "storage-2|/project/path-1", CreatedAt: event2.CreatedAt, // it is a hack to have same time for both Job: job2, } @@ -79,7 +79,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) { ID: 1, State: JobStateInProgress, Attempt: 2, - LockID: "", + LockID: "storage-1|/project/path-1", CreatedAt: event1.CreatedAt, // it is a hack to have same time for both UpdatedAt: dequeuedAttempt1[0].UpdatedAt, // it is a hack to have same time for both Job: job1, @@ -98,7 +98,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) { ID: 1, State: JobStateInProgress, Attempt: 1, - LockID: "", + LockID: "storage-1|/project/path-1", CreatedAt: event1.CreatedAt, // it is a hack to have same time for both UpdatedAt: dequeuedAttempt2[0].UpdatedAt, // it is a hack to have same time for both Job: job1, @@ -117,7 +117,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) { ID: 1, State: JobStateInProgress, Attempt: 0, - LockID: "", + LockID: "storage-1|/project/path-1", CreatedAt: event1.CreatedAt, // it is a hack to have same time for both UpdatedAt: dequeuedAttempt3[0].UpdatedAt, // it is a hack to have same time for both Job: job1, @@ -140,14 +140,14 @@ func TestMemoryReplicationEventQueue(t *testing.T) { ID: 2, State: JobStateInProgress, Attempt: 2, - LockID: "", + LockID: "storage-2|/project/path-1", CreatedAt: event2.CreatedAt, // it is a hack to have same time for both UpdatedAt: dequeuedAttempt5[0].UpdatedAt, // it is a hack to have same time for both Job: job2, } require.Equal(t, expAttempt5, dequeuedAttempt5[0]) - acknowledgedAttempt5, err := queue.Acknowledge(ctx, JobStateCompleted, []uint64{event2.ID}) + acknowledgedAttempt5, err := queue.Acknowledge(ctx, JobStateDead, []uint64{event2.ID}) require.NoError(t, err) require.Equal(t, []uint64{event2.ID}, acknowledgedAttempt5, "one event must be acknowledged") diff --git a/internal/praefect/datastore/migrations/20200224220728_job_queue.go b/internal/praefect/datastore/migrations/20200224220728_job_queue.go index e63116354..cab5a5494 100644 --- a/internal/praefect/datastore/migrations/20200224220728_job_queue.go +++ b/internal/praefect/datastore/migrations/20200224220728_job_queue.go @@ -5,40 +5,33 @@ import migrate "github.com/rubenv/sql-migrate" func init() { m := &migrate.Migration{ Id: "20200224220728_job_queue", - Up: []string{` -CREATE TYPE GITALY_REPLICATION_JOB_STATE AS ENUM('ready', 'in_progress', 'completed', 'cancelled', 'failed') -`, ` -CREATE TABLE gitaly_replication_queue_lock -( - id TEXT PRIMARY KEY - , acquired BOOLEAN NOT NULL DEFAULT FALSE -) -`, ` -CREATE TABLE gitaly_replication_queue -( - id BIGSERIAL PRIMARY KEY - , state GITALY_REPLICATION_JOB_STATE NOT NULL DEFAULT 'ready' - , created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC') - , updated_at TIMESTAMP WITHOUT TIME ZONE - , attempt INTEGER NOT NULL DEFAULT 3 - , lock_id TEXT - , job JSONB -)`, ` -CREATE TABLE gitaly_replication_queue_job_lock -( - job_id BIGINT REFERENCES gitaly_replication_queue(id) - , lock_id TEXT REFERENCES gitaly_replication_queue_lock(id) - , triggered_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC') - , CONSTRAINT gitaly_replication_queue_job_lock_pk PRIMARY KEY (job_id, lock_id) -)`, + Up: []string{ + `CREATE TYPE REPLICATION_JOB_STATE AS ENUM('ready', 'in_progress', 'completed', 'cancelled', 'failed', 'dead')`, + `CREATE TABLE replication_queue_lock ( + id TEXT PRIMARY KEY + , acquired BOOLEAN NOT NULL DEFAULT FALSE + )`, + `CREATE TABLE replication_queue ( + id BIGSERIAL PRIMARY KEY + , state REPLICATION_JOB_STATE NOT NULL DEFAULT 'ready' + , created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC') + , updated_at TIMESTAMP WITHOUT TIME ZONE + , attempt INTEGER NOT NULL DEFAULT 3 + , lock_id TEXT + , job JSONB + , meta JSONB + )`, + `CREATE TABLE replication_queue_job_lock ( + job_id BIGINT REFERENCES replication_queue(id) + , lock_id TEXT REFERENCES replication_queue_lock(id) + , triggered_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC') + , CONSTRAINT replication_queue_job_lock_pk PRIMARY KEY (job_id, lock_id) + )`, }, - Down: []string{` -DROP TABLE IF EXISTS gitaly_replication_queue_job_lock CASCADE -`, ` -DROP TABLE IF EXISTS gitaly_replication_queue CASCADE -`, ` -DROP TABLE IF EXISTS gitaly_replication_queue_lock CASCADE -`, + Down: []string{ + `DROP TABLE IF EXISTS replication_queue_job_lock CASCADE`, + `DROP TABLE IF EXISTS replication_queue CASCADE`, + `DROP TABLE IF EXISTS replication_queue_lock CASCADE`, }, } diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index 42453f3a4..e15ac48a6 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -25,7 +25,7 @@ type ReplicationEventQueue interface { func allowToAck(state JobState) error { switch state { - case JobStateCompleted, JobStateFailed, JobStateCancelled: + case JobStateCompleted, JobStateFailed, JobStateCancelled, JobStateDead: return nil default: return fmt.Errorf("event state is not supported: %q", state) @@ -67,6 +67,7 @@ type ReplicationEvent struct { CreatedAt time.Time UpdatedAt *time.Time Job ReplicationJob + Meta Params } // Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases. @@ -88,6 +89,8 @@ func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) mapping = append(mapping, &event.LockID) case "job": mapping = append(mapping, &event.Job) + case "meta": + mapping = append(mapping, &event.Meta) default: return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column) } @@ -138,18 +141,18 @@ type PostgresReplicationEventQueue struct { func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { query := ` -WITH insert_lock AS ( - INSERT INTO gitaly_replication_queue_lock(id) - VALUES ($1 || '|' || $2) - ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id - RETURNING id -) -INSERT INTO gitaly_replication_queue(lock_id, job) -SELECT insert_lock.id, $3 -FROM insert_lock -RETURNING id, state, created_at, updated_at, lock_id, attempt, job` + WITH insert_lock AS ( + INSERT INTO replication_queue_lock(id) + VALUES ($1 || '|' || $2) + ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id + RETURNING id + ) + INSERT INTO replication_queue(lock_id, job, meta) + SELECT insert_lock.id, $3, $4 + FROM insert_lock + RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` // this will always return a single row result (because of lock uniqueness) or an error - rows, err := rq.qc.QueryContext(ctx, query, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job) + rows, err := rq.qc.QueryContext(ctx, query, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) if err != nil { return ReplicationEvent{}, err } @@ -164,49 +167,48 @@ RETURNING id, state, created_at, updated_at, lock_id, attempt, job` func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, nodeStorage string, count int) ([]ReplicationEvent, error) { query := ` -WITH to_lock AS ( - SELECT id - FROM gitaly_replication_queue_lock AS repo_lock - WHERE repo_lock.acquired = FALSE AND repo_lock.id IN ( - SELECT lock_id - FROM gitaly_replication_queue - WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1 - ORDER BY created_at - LIMIT $2 FOR UPDATE - ) - FOR UPDATE SKIP LOCKED -) -, jobs AS ( - UPDATE gitaly_replication_queue AS queue - SET attempt = queue.attempt - 1, - state = 'in_progress', - updated_at = NOW() AT TIME ZONE 'UTC' - FROM to_lock - WHERE queue.lock_id IN (SELECT id FROM to_lock) - AND state NOT IN ('in_progress', 'cancelled', 'completed') - AND queue.id IN ( - SELECT id - FROM gitaly_replication_queue - WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1 - ORDER BY created_at - LIMIT $2 - ) - RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job -) -, track_job_lock AS ( - INSERT INTO gitaly_replication_queue_job_lock (job_id, lock_id, triggered_at) - SELECT jobs.id, jobs.lock_id, NOW() AT TIME ZONE 'UTC' FROM jobs - RETURNING lock_id -) -, do_lock AS ( - UPDATE gitaly_replication_queue_lock - SET acquired = TRUE - WHERE id IN (SELECT lock_id FROM track_job_lock) -) -SELECT id, state, created_at, updated_at, lock_id, attempt, job -FROM jobs -ORDER BY id -` + WITH to_lock AS ( + SELECT id + FROM replication_queue_lock AS repo_lock + WHERE repo_lock.acquired = FALSE AND repo_lock.id IN ( + SELECT lock_id + FROM replication_queue + WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1 + ORDER BY created_at + LIMIT $2 FOR UPDATE + ) + FOR UPDATE SKIP LOCKED + ) + , jobs AS ( + UPDATE replication_queue AS queue + SET attempt = queue.attempt - 1 + , state = 'in_progress' + , updated_at = NOW() AT TIME ZONE 'UTC' + FROM to_lock + WHERE queue.lock_id IN (SELECT id FROM to_lock) + AND state NOT IN ('in_progress', 'cancelled', 'completed') + AND queue.id IN ( + SELECT id + FROM replication_queue + WHERE attempt > 0 AND state IN ('ready', 'failed') AND job->>'target_node_storage' = $1 + ORDER BY created_at + LIMIT $2 + ) + RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta + ) + , track_job_lock AS ( + INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at) + SELECT jobs.id, jobs.lock_id, NOW() AT TIME ZONE 'UTC' FROM jobs + RETURNING lock_id + ) + , do_lock AS ( + UPDATE replication_queue_lock + SET acquired = TRUE + WHERE id IN (SELECT lock_id FROM track_job_lock) + ) + SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta + FROM jobs + ORDER BY id` rows, err := rq.qc.QueryContext(ctx, query, nodeStorage, count) if err != nil { return nil, err @@ -229,45 +231,42 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J params := glsql.NewParamsAssembler() query := ` -WITH existing AS ( - SELECT id, lock_id - FROM gitaly_replication_queue - WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `) - AND state = 'in_progress' - FOR UPDATE -) -, to_release AS ( - UPDATE gitaly_replication_queue AS queue - SET state = ` + params.AddParam(state) + ` - FROM existing - WHERE existing.id = queue.id - RETURNING queue.id, queue.lock_id -) -, removed_job_lock AS ( - DELETE FROM gitaly_replication_queue_job_lock AS job_lock - USING to_release AS job_failed - WHERE job_lock.job_id = job_failed.id AND job_lock.lock_id = job_failed.lock_id - RETURNING job_failed.lock_id -) -, release AS ( - UPDATE gitaly_replication_queue_lock - SET acquired = FALSE - WHERE id IN ( - SELECT existing.lock_id - FROM ( - SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id - ) AS removed - JOIN ( - SELECT lock_id, COUNT(*) AS amount - FROM gitaly_replication_queue_job_lock - WHERE lock_id IN (SELECT lock_id FROM removed_job_lock) - GROUP BY lock_id - ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount - ) -) -SELECT id -FROM existing -` + WITH existing AS ( + SELECT id, lock_id + FROM replication_queue + WHERE id IN (` + params.AddParams(glsql.Uint64sToInterfaces(ids...)) + `) + AND state = 'in_progress' + FOR UPDATE + ) + , to_release AS ( + UPDATE replication_queue AS queue + SET state = ` + params.AddParam(state) + ` + FROM existing + WHERE existing.id = queue.id + RETURNING queue.id, queue.lock_id + ) + , removed_job_lock AS ( + DELETE FROM replication_queue_job_lock AS job_lock + USING to_release AS job_failed + WHERE job_lock.job_id = job_failed.id AND job_lock.lock_id = job_failed.lock_id + RETURNING job_failed.lock_id + ) + , release AS ( + UPDATE replication_queue_lock + SET acquired = FALSE + WHERE id IN ( + SELECT existing.lock_id + FROM (SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id) AS removed + JOIN ( + SELECT lock_id, COUNT(*) AS amount + FROM replication_queue_job_lock + WHERE lock_id IN (SELECT lock_id FROM removed_job_lock) + GROUP BY lock_id + ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount + ) + ) + SELECT id + FROM existing` rows, err := rq.qc.QueryContext(ctx, query, params.Params()...) if err != nil { return nil, err diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 749c45ca8..56010f86c 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -53,7 +53,7 @@ func TestPostgresReplicationEventQueue_Enqueue(t *testing.T) { require.Equal(t, expEvent, actualEvent) requireEvents(t, ctx, db, []ReplicationEvent{expEvent}) requireLocks(t, ctx, db, []LockRow{expLock}) // expected a new lock for new event - db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) + db.RequireRowsInTable(t, "replication_queue_job_lock", 0) } func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { @@ -117,7 +117,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { requireEvents(t, ctx, db, []ReplicationEvent{expEvent1}) requireLocks(t, ctx, db, []LockRow{expLock1}) // expected a new lock for new event - db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) + db.RequireRowsInTable(t, "replication_queue_job_lock", 0) event2, err := queue.Enqueue(ctx, eventType1) // repeat of the same event require.NoError(t, err) @@ -179,7 +179,7 @@ func TestPostgresReplicationEventQueue_EnqueueMultiple(t *testing.T) { requireEvents(t, ctx, db, []ReplicationEvent{expEvent1, expEvent2, expEvent3, expEvent4}) requireLocks(t, ctx, db, []LockRow{expLock1, expLock2, expLock3}) // the new lock for same target but for another repo - db.RequireRowsInTable(t, "gitaly_replication_queue_job_lock", 0) // there is no fetches it must be empty + db.RequireRowsInTable(t, "replication_queue_job_lock", 0) // there is no fetches it must be empty } func TestPostgresReplicationEventQueue_Dequeue(t *testing.T) { @@ -536,7 +536,7 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { require.NoError(t, err) require.Equal(t, ([]uint64)(nil), acknowledge4) // event that was not dequeued can't be acknowledged var newEventState string - require.NoError(t, db.QueryRow("SELECT state FROM gitaly_replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState)) + require.NoError(t, db.QueryRow("SELECT state FROM replication_queue WHERE id = $1", newEvent.ID).Scan(&newEventState)) require.Equal(t, "ready", newEventState, "no way to acknowledge event that is not in in_progress state(was not dequeued)") requireLocks(t, ctx, db, []LockRow{ {ID: "gitaly-1|/project/path-1", Acquired: true}, @@ -563,7 +563,7 @@ func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []Re exp[i].UpdatedAt = nil } - sqlStmt := `SELECT id, state, attempt, lock_id, job FROM gitaly_replication_queue ORDER BY id` + sqlStmt := `SELECT id, state, attempt, lock_id, job FROM replication_queue ORDER BY id` rows, err := db.QueryContext(ctx, sqlStmt) require.NoError(t, err) @@ -572,7 +572,7 @@ func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []Re require.Equal(t, exp, actual) } -// LockRow exists only for testing purposes and represents entries from gitaly_replication_queue_lock table. +// LockRow exists only for testing purposes and represents entries from replication_queue_lock table. type LockRow struct { ID string Acquired bool @@ -581,7 +581,7 @@ type LockRow struct { func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []LockRow) { t.Helper() - sqlStmt := `SELECT id, acquired FROM gitaly_replication_queue_lock` + sqlStmt := `SELECT id, acquired FROM replication_queue_lock` rows, err := db.QueryContext(ctx, sqlStmt) require.NoError(t, err) defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }() @@ -596,7 +596,7 @@ func requireLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []Loc require.ElementsMatch(t, expected, actual) } -// JobLockRow exists only for testing purposes and represents entries from gitaly_replication_queue_job_lock table. +// JobLockRow exists only for testing purposes and represents entries from replication_queue_job_lock table. type JobLockRow struct { JobID uint64 LockID string @@ -606,7 +606,7 @@ type JobLockRow struct { func requireJobLocks(t *testing.T, ctx context.Context, db glsql.DB, expected []JobLockRow) { t.Helper() - sqlStmt := `SELECT job_id, lock_id FROM gitaly_replication_queue_job_lock ORDER BY triggered_at` + sqlStmt := `SELECT job_id, lock_id FROM replication_queue_job_lock ORDER BY triggered_at` rows, err := db.QueryContext(ctx, sqlStmt) require.NoError(t, err) defer func() { require.NoError(t, rows.Close(), "completion of result fetching") }() diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 930451144..22c09f75d 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -75,11 +75,12 @@ func testConfig(backends int) config.Config { // setupServer wires all praefect dependencies together via dependency // injection -func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) (*datastore.MemoryDatastore, *Server) { - var ( - ds = datastore.NewInMemory(conf) - coordinator = NewCoordinator(l, ds, nodeMgr, conf, r) - ) +func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) *Server { + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + } + coordinator := NewCoordinator(l, ds, nodeMgr, conf, r) var defaultNode *models.Node for _, n := range conf.VirtualStorages[0].Nodes { @@ -91,7 +92,7 @@ func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *log server := NewServer(coordinator.StreamDirector, l, r, conf) - return ds, server + return server } // runPraefectServer runs a praefect server with the provided mock servers. @@ -124,7 +125,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st r := protoregistry.New() require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t))) - _, prf := setupServer(t, conf, nodeMgr, log.Default(), r) + prf := setupServer(t, conf, nodeMgr, log.Default(), r) listener, port := listenAvailPort(t) t.Logf("praefect listening on port %d", port) @@ -175,7 +176,10 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client conf.VirtualStorages[0].Nodes[i] = node } - ds := datastore.NewInMemory(conf) + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + } logEntry := log.Default() nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, promtest.NewMockHistogramVec()) diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 9fa23eeb6..d1afaf8a5 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" @@ -250,6 +251,8 @@ const ( logWithReplJobID = "replication_job_id" logWithReplSource = "replication_job_source" logWithReplTarget = "replication_job_target" + logWithReplChange = "replication_job_change" + logWithReplPath = "replication_job_path" logWithCorrID = "replication_correlation_id" ) @@ -279,10 +282,6 @@ func ExpBackoffFunc(start time.Duration, max time.Duration) BackoffFunc { } } -const ( - maxAttempts = 3 -) - func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []nodes.Node, err error) { shard, err := r.nodeManager.GetShard(r.virtualStorage) if err != nil { @@ -302,78 +301,99 @@ func (r ReplMgr) getPrimaryAndSecondaries() (primary nodes.Node, secondaries []n return primary, secondaries, nil } +// createReplJob converts `ReplicationEvent` into `ReplJob`. +// It is intermediate solution until `ReplJob` removed and code not adopted to `ReplicationEvent`. +func (r ReplMgr) createReplJob(event datastore.ReplicationEvent) (datastore.ReplJob, error) { + targetNode, err := r.datastore.GetStorageNode(event.Job.TargetNodeStorage) + if err != nil { + return datastore.ReplJob{}, err + } + + sourceNode, err := r.datastore.GetStorageNode(event.Job.SourceNodeStorage) + if err != nil { + return datastore.ReplJob{}, err + } + + var correlationID string + if val, found := event.Meta[metadatahandler.CorrelationIDKey]; found { + correlationID, _ = val.(string) + } + + replJob := datastore.ReplJob{ + Attempts: event.Attempt, + Change: event.Job.Change, + ID: event.ID, + TargetNode: targetNode, + SourceNode: sourceNode, + RelativePath: event.Job.RelativePath, + Params: event.Job.Params, + CorrelationID: correlationID, + } + + return replJob, nil +} + // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFunc) error { backoff, reset := b() for { - var totalJobs int + var totalEvents int primary, secondaries, err := r.getPrimaryAndSecondaries() if err == nil { for _, secondary := range secondaries { - jobs, err := r.datastore.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStateFailed}, secondary.GetStorage(), 10) + events, err := r.datastore.Dequeue(ctx, secondary.GetStorage(), 10) if err != nil { - return err + r.log.WithField(logWithReplTarget, secondary.GetStorage()).WithError(err).Error("failed to dequeue replication events") + continue } - totalJobs += len(jobs) + totalEvents += len(events) - type replicatedKey struct { - change datastore.ChangeType - repoPath, source, target string - } - reposReplicated := make(map[replicatedKey]struct{}) - - for _, job := range jobs { - if job.Attempts >= maxAttempts { - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateDead); err != nil { - r.log.WithError(err).Error("error when updating replication job status to cancelled") - } + eventIDsByState := map[datastore.JobState][]uint64{} + for _, event := range events { + job, err := r.createReplJob(event) + if err != nil { + r.log.WithField("event", event).WithError(err).Error("failed to restore replication job") + eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) continue } - - var replicationKey replicatedKey - switch job.Change { - // this optimization could be done only for Update and Delete replication jobs as we treat them as idempotent - // Update - there is no much profit from executing multiple fetches for the same target from the same source one by one - // Delete - there is no way how we could remove already removed repository - // that is why those Jobs needs to be tracked and marked as Cancelled (removed from queue without execution). - case datastore.UpdateRepo, datastore.DeleteRepo: - replicationKey = replicatedKey{ - change: job.Change, - repoPath: job.RelativePath, - source: job.SourceNode.Storage, - target: job.TargetNode.Storage, - } - - if _, ok := reposReplicated[replicationKey]; ok { - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCancelled); err != nil { - r.log.WithError(err).Error("error when updating replication job status to cancelled") - } - continue - } - } - - if err = r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil { + if err := r.processReplJob(ctx, job, primary.GetConnection(), secondary.GetConnection()); err != nil { r.log.WithFields(logrus.Fields{ - logWithReplJobID: job.ID, - "from_storage": job.SourceNode.Storage, - "to_storage": job.TargetNode.Storage, + logWithReplJobID: job.ID, + logWithReplTarget: job.TargetNode.Storage, + logWithReplSource: job.SourceNode.Storage, + logWithReplChange: job.Change, + logWithReplPath: job.RelativePath, + logWithCorrID: job.CorrelationID, }).WithError(err).Error("replication job failed") - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateFailed); err != nil { - r.log.WithError(err).Error("error when updating replication job status to failed") + if job.Attempts == 0 { + eventIDsByState[datastore.JobStateDead] = append(eventIDsByState[datastore.JobStateDead], event.ID) + } else { + eventIDsByState[datastore.JobStateFailed] = append(eventIDsByState[datastore.JobStateFailed], event.ID) } continue } + eventIDsByState[datastore.JobStateCompleted] = append(eventIDsByState[datastore.JobStateCompleted], event.ID) + } + for state, eventIDs := range eventIDsByState { + ackIDs, err := r.datastore.Acknowledge(ctx, state, eventIDs) + if err != nil { + r.log.WithField("state", state).WithField("event_ids", eventIDs).WithError(err).Error("failed to acknowledge replication events") + continue + } - reposReplicated[replicationKey] = struct{}{} + notAckIDs := subtractUint64(ackIDs, eventIDs) + if len(notAckIDs) > 0 { + r.log.WithField("state", state).WithField("event_ids", notAckIDs).WithError(err).Error("replication events were not acknowledged") + } } } } else { r.log.WithError(err).WithField("virtual_storage", r.virtualStorage).Error("error when getting primary and secondaries") } - if totalJobs == 0 { + if totalEvents == 0 { select { case <-time.After(backoff()): continue @@ -396,18 +416,10 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour WithField(logWithReplJobID, job.ID). WithField(logWithReplSource, job.SourceNode). WithField(logWithReplTarget, job.TargetNode). + WithField(logWithReplPath, job.RelativePath). + WithField(logWithReplChange, job.Change). WithField(logWithCorrID, job.CorrelationID) - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateInProgress); err != nil { - l.WithError(err).Error("unable to update replication job to in progress") - return err - } - - if err := r.datastore.IncrReplJobAttempts(job.ID); err != nil { - l.WithError(err).Error("unable to increment replication job attempts") - return err - } - var replCtx context.Context var cancel func() @@ -451,9 +463,32 @@ func (r ReplMgr) processReplJob(ctx context.Context, job datastore.ReplJob, sour replDuration := time.Since(replStart) r.replLatencyMetric.Observe(float64(replDuration) / float64(time.Second)) - if err := r.datastore.UpdateReplJobState(job.ID, datastore.JobStateCompleted); err != nil { - r.log.WithError(err).Error("error when updating replication job status to complete") + return nil +} + +// subtractUint64 returns new slice that has all elements from left that does not exist at right. +func subtractUint64(l, r []uint64) []uint64 { + if len(l) == 0 { + return nil } - return nil + if len(r) == 0 { + result := make([]uint64, len(l)) + copy(result, l) + return result + } + + excludeSet := make(map[uint64]struct{}, len(l)) + for _, v := range r { + excludeSet[v] = struct{}{} + } + + var result []uint64 + for _, v := range l { + if _, found := excludeSet[v]; !found { + result = append(result, v) + } + } + + return result } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 2dfe83467..3689f868f 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -24,6 +24,7 @@ import ( objectpoolservice "gitlab.com/gitlab-org/gitaly/internal/service/objectpool" "gitlab.com/gitlab-org/gitaly/internal/service/remote" "gitlab.com/gitlab-org/gitaly/internal/service/repository" + "gitlab.com/gitlab-org/gitaly/internal/service/ssh" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -32,8 +33,6 @@ import ( "google.golang.org/grpc/reflection" ) -const correlationID = "my-correlation-id" - func TestProcessReplicationJob(t *testing.T) { srv, srvSocketPath := runFullGitalyServer(t) defer srv.Stop() @@ -65,7 +64,7 @@ func TestProcessReplicationJob(t *testing.T) { }, ) - config := config.Config{ + conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ &config.VirtualStorage{ Name: "default", @@ -86,7 +85,10 @@ func TestProcessReplicationJob(t *testing.T) { }, } - ds := datastore.NewInMemory(config) + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + } // create object pool on the source objectPoolPath := testhelper.NewTestObjectPoolName(t) @@ -117,20 +119,23 @@ func TestProcessReplicationJob(t *testing.T) { }) require.NoError(t, err) - primary, err := ds.GetPrimary(config.VirtualStorages[0].Name) + primary, err := ds.GetPrimary(conf.VirtualStorages[0].Name) require.NoError(t, err) - secondaries, err := ds.GetSecondaries(config.VirtualStorages[0].Name) + secondaries, err := ds.GetSecondaries(conf.VirtualStorages[0].Name) require.NoError(t, err) - var secondaryStorages []string + var jobs []datastore.ReplJob for _, secondary := range secondaries { - secondaryStorages = append(secondaryStorages, secondary.Storage) + jobs = append(jobs, datastore.ReplJob{ + Change: datastore.UpdateRepo, + TargetNode: secondary, + SourceNode: primary, + RelativePath: testRepo.GetRelativePath(), + State: datastore.JobStateReady, + Attempts: 3, + CorrelationID: "correlation-id", + }) } - _, err = ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, secondaryStorages, datastore.UpdateRepo, nil) - require.NoError(t, err) - - jobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateReady, datastore.JobStatePending}, backupStorageName, 1) - require.NoError(t, err) require.Len(t, jobs, 1) commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{ @@ -141,7 +146,7 @@ func TestProcessReplicationJob(t *testing.T) { entry := testhelper.DiscardTestEntry(t) replicator.log = entry - nodeMgr, err := nodes.NewManager(entry, config, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -151,7 +156,7 @@ func TestProcessReplicationJob(t *testing.T) { replMgr := NewReplMgr("", testhelper.DiscardTestEntry(t), ds, nodeMgr, WithLatencyMetric(&mockReplicationHistogram), WithQueueMetric(&mockReplicationGauge)) replMgr.replicator = replicator - shard, err := nodeMgr.GetShard(config.VirtualStorages[0].Name) + shard, err := nodeMgr.GetShard(conf.VirtualStorages[0].Name) require.NoError(t, err) primaryNode, err := shard.GetPrimary() require.NoError(t, err) @@ -216,7 +221,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { defer primarySvr.Stop() backupSvr, backupSocket := newReplicationService(t) - backupSvr.Stop() + defer backupSvr.Stop() internalListener, err := net.Listen("unix", gitaly_config.GitalyInternalSocketPath()) require.NoError(t, err) @@ -243,7 +248,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { Address: "unix://" + backupSocket, } - config := config.Config{ + conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ { Name: "default", @@ -256,58 +261,99 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { } ctx, cancel := testhelper.Context() - defer func(oldStorages []gitaly_config.Storage) { - gitaly_config.Config.Storages = oldStorages - cancel() - }(gitaly_config.Config.Storages) + defer cancel() + + defer func(oldStorages []gitaly_config.Storage) { gitaly_config.Config.Storages = oldStorages }(gitaly_config.Config.Storages) gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{ Name: backupStorageName, Path: backupDir, - }, - gitaly_config.Storage{ - Name: "default", - Path: testhelper.GitlabTestStoragePath(), - }, - ) + }) + + require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one") + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + processed := make(chan struct{}) - ds := datastore.NewInMemory(config) - ids, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil) + dequeues := 0 + queueInterceptor.OnDequeue(func(ctx context.Context, target string, count int, queue datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { + events, err := queue.Dequeue(ctx, target, count) + if len(events) > 0 { + dequeues++ + } + return events, err + }) + + completedAcks := 0 + failedAcks := 0 + deadAcks := 0 + + queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { + switch state { + case datastore.JobStateCompleted: + require.Equal(t, []uint64{1}, ids) + completedAcks++ + case datastore.JobStateFailed: + require.Equal(t, []uint64{2}, ids) + failedAcks++ + case datastore.JobStateDead: + require.Equal(t, []uint64{2}, ids) + deadAcks++ + default: + require.FailNow(t, "acknowledge is not expected", state) + } + ackIDs, err := queue.Acknowledge(ctx, state, ids) + if completedAcks+failedAcks+deadAcks == 4 { + close(processed) + } + return ackIDs, err + }) + + ds := datastore.MemoryQueue{ + ReplicationEventQueue: queueInterceptor, + MemoryDatastore: datastore.NewInMemory(conf), + } + + // this job exists to verify that replication works + okJob := datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + RelativePath: testRepo.RelativePath, + TargetNodeStorage: secondary.Storage, + SourceNodeStorage: primary.Storage, + } + event1, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: okJob}) require.NoError(t, err) - require.Len(t, ids, 1) + require.Equal(t, uint64(1), event1.ID) - entry := testhelper.DiscardTestEntry(t) + // this job checks flow for replication event that fails + failJob := okJob + failJob.RelativePath = "invalid path to fail the job" + event2, err := ds.ReplicationEventQueue.Enqueue(ctx, datastore.ReplicationEvent{Job: failJob}) + require.NoError(t, err) + require.Equal(t, uint64(2), event2.ID) - require.NoError(t, ds.UpdateReplJobState(ids[0], datastore.JobStateReady)) + logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, config, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr("default", entry, ds, nodeMgr) - replMgr.replJobTimeout = 100 * time.Millisecond - - go replMgr.ProcessBacklog(ctx, noopBackoffFunc) - - timeLimit := time.NewTimer(5 * time.Second) - ticker := time.NewTicker(1 * time.Second) - - // the job will fail to process because the client connection for "backup" is not registered. It should fail maxAttempts times - // and get cancelled. -TestJobGetsCancelled: - for { - select { - case <-ticker.C: - replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateDead}, "backup", 10) - require.NoError(t, err) - if len(replJobs) == 1 { - //success - timeLimit.Stop() - break TestJobGetsCancelled - } - case <-timeLimit.C: - t.Fatal("time limit expired for job to be deemed dead") - } + replMgr := NewReplMgr("default", logEntry, ds, nodeMgr) + + go func() { + require.Equal(t, context.Canceled, replMgr.ProcessBacklog(ctx, noopBackoffFunc), "backlog processing failed") + }() + + select { + case <-processed: + case <-time.After(60 * time.Second): + // strongly depends on the processing capacity + t.Fatal("time limit expired for job to complete") } + + require.Equal(t, 3, dequeues, "expected 1 deque to get [okJob, failJob] and 2 more for [failJob] only") + require.Equal(t, 2, failedAcks) + require.Equal(t, 1, deadAcks) + require.Equal(t, 1, completedAcks) } func TestProcessBacklog_Success(t *testing.T) { @@ -342,7 +388,7 @@ func TestProcessBacklog_Success(t *testing.T) { Address: "unix://" + backupSocket, } - config := config.Config{ + conf := config.Config{ VirtualStorages: []*config.VirtualStorage{ { Name: "default", @@ -355,36 +401,49 @@ func TestProcessBacklog_Success(t *testing.T) { } ctx, cancel := testhelper.Context() - defer func(oldStorages []gitaly_config.Storage) { - gitaly_config.Config.Storages = oldStorages - cancel() - }(gitaly_config.Config.Storages) + defer cancel() + + defer func(oldStorages []gitaly_config.Storage) { gitaly_config.Config.Storages = oldStorages }(gitaly_config.Config.Storages) gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{ Name: backupStorageName, Path: backupDir, - }, - gitaly_config.Storage{ - Name: "default", - Path: testhelper.GitlabTestStoragePath(), - }, - ) + }) + require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one") + + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) - ds := datastore.NewInMemory(config) + processed := make(chan struct{}) + queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { + ackIDs, err := queue.Acknowledge(ctx, state, ids) + if len(ids) > 0 { + require.Equal(t, datastore.JobStateCompleted, state, "no fails expected") + require.Equal(t, []uint64{1, 2, 3, 4}, ids, "all jobs must be processed at once") + close(processed) + } + return ackIDs, err + }) - var jobIDs []uint64 + ds := datastore.MemoryQueue{ + MemoryDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: queueInterceptor, + } // Update replication job - idsUpdate1, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil) + eventType1 := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + RelativePath: testRepo.GetRelativePath(), + TargetNodeStorage: secondary.Storage, + SourceNodeStorage: primary.Storage, + }, + } + + _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1) require.NoError(t, err) - require.Len(t, idsUpdate1, 1) - jobIDs = append(jobIDs, idsUpdate1...) - // Update replication job - idsUpdate2, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.UpdateRepo, nil) + _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType1) require.NoError(t, err) - require.Len(t, idsUpdate2, 1) - jobIDs = append(jobIDs, idsUpdate2...) renameTo1 := filepath.Join(testRepo.GetRelativePath(), "..", filepath.Base(testRepo.GetRelativePath())+"-mv1") fullNewPath1 := filepath.Join(backupDir, renameTo1) @@ -393,50 +452,50 @@ func TestProcessBacklog_Success(t *testing.T) { fullNewPath2 := filepath.Join(backupDir, renameTo2) // Rename replication job - idsRename1, err := ds.CreateReplicaReplJobs(correlationID, testRepo.GetRelativePath(), primary.Storage, []string{secondary.Storage}, datastore.RenameRepo, datastore.Params{"RelativePath": renameTo1}) + eventType2 := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.RenameRepo, + RelativePath: testRepo.GetRelativePath(), + TargetNodeStorage: secondary.Storage, + SourceNodeStorage: primary.Storage, + Params: datastore.Params{"RelativePath": renameTo1}, + }, + } + + _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType2) require.NoError(t, err) - require.Len(t, idsRename1, 1) - jobIDs = append(jobIDs, idsRename1...) // Rename replication job - idsRename2, err := ds.CreateReplicaReplJobs(correlationID, renameTo1, primary.Storage, []string{secondary.Storage}, datastore.RenameRepo, datastore.Params{"RelativePath": renameTo2}) + eventType3 := datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.RenameRepo, + RelativePath: renameTo1, + TargetNodeStorage: secondary.Storage, + SourceNodeStorage: primary.Storage, + Params: datastore.Params{"RelativePath": renameTo2}, + }, + } require.NoError(t, err) - require.Len(t, idsRename2, 1) - jobIDs = append(jobIDs, idsRename2...) - entry := testhelper.DiscardTestEntry(t) + _, err = ds.ReplicationEventQueue.Enqueue(ctx, eventType3) + require.NoError(t, err) - for _, id := range jobIDs { - require.NoError(t, ds.UpdateReplJobState(id, datastore.JobStateReady)) - } + logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, config, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, promtest.NewMockHistogramVec()) require.NoError(t, err) - replMgr := NewReplMgr("default", entry, ds, nodeMgr) - replMgr.replJobTimeout = 5 * time.Second + replMgr := NewReplMgr("default", logEntry, ds, nodeMgr) go func() { require.Equal(t, context.Canceled, replMgr.ProcessBacklog(ctx, noopBackoffFunc), "backlog processing failed") }() - timeLimit := time.NewTimer(5 * time.Second) - ticker := time.NewTicker(1 * time.Second) - - // Once the listener is being served, and we try the job again it should succeed -TestJobSucceeds: - for { - select { - case <-ticker.C: - replJobs, err := ds.GetJobs([]datastore.JobState{datastore.JobStateFailed, datastore.JobStateInProgress, datastore.JobStateReady, datastore.JobStateDead}, "backup", 10) - require.NoError(t, err) - if len(replJobs) == 0 { - //success - break TestJobSucceeds - } - case <-timeLimit.C: - t.Fatal("time limit expired for job to complete") - } + select { + case <-processed: + case <-time.After(60 * time.Second): + // strongly depends on the processing capacity + t.Fatal("time limit expired for job to complete") } _, serr := os.Stat(fullNewPath1) @@ -483,7 +542,7 @@ func runFullGitalyServer(t *testing.T) (*grpc.Server, string) { return server, "unix://" + serverSocketPath } -// newReplicationService is a grpc service that has the Repository, Remote and ObjectPool services, which +// newReplicationService is a grpc service that has the SSH, Repository, Remote and ObjectPool services, which // are the only ones needed for replication func newReplicationService(tb testing.TB) (*grpc.Server, string) { socketName := testhelper.GetTemporaryGitalySocketFileName() @@ -493,6 +552,7 @@ func newReplicationService(tb testing.TB) (*grpc.Server, string) { gitalypb.RegisterRepositoryServiceServer(svr, repository.NewServer(&rubyserver.Server{}, gitaly_config.GitalyInternalSocketPath())) gitalypb.RegisterObjectPoolServiceServer(svr, objectpoolservice.NewServer()) gitalypb.RegisterRemoteServiceServer(svr, remote.NewServer(&rubyserver.Server{})) + gitalypb.RegisterSSHServiceServer(svr, ssh.NewServer()) reflection.Register(svr) listener, err := net.Listen("unix", socketName) @@ -542,3 +602,24 @@ func testMain(m *testing.M) int { return m.Run() } + +func TestSubtractUint64(t *testing.T) { + testCases := []struct { + desc string + left []uint64 + right []uint64 + exp []uint64 + }{ + {desc: "empty left", left: nil, right: []uint64{1, 2}, exp: nil}, + {desc: "empty right", left: []uint64{1, 2}, right: []uint64{}, exp: []uint64{1, 2}}, + {desc: "some exists", left: []uint64{1, 2, 3, 4, 5}, right: []uint64{2, 4, 5}, exp: []uint64{1, 3}}, + {desc: "nothing exists", left: []uint64{10, 20}, right: []uint64{100, 200}, exp: []uint64{10, 20}}, + {desc: "duplicates exists", left: []uint64{1, 1, 2, 3, 3, 4, 4, 5}, right: []uint64{3, 4, 4, 5}, exp: []uint64{1, 1, 2}}, + } + + for _, testCase := range testCases { + t.Run(testCase.desc, func(t *testing.T) { + require.Equal(t, testCase.exp, subtractUint64(testCase.left, testCase.right)) + }) + } +} diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 1cdee452a..3c3e0c9f7 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -139,7 +139,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) { registry := protoregistry.New() require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - _, srv := setupServer(t, conf, nodeMgr, entry, registry) + srv := setupServer(t, conf, nodeMgr, entry, registry) listener, port := listenAvailPort(t) go func() { diff --git a/internal/praefect/service/info/consistencycheck.go b/internal/praefect/service/info/consistencycheck.go index 64e3d2cfb..5f5b97fb7 100644 --- a/internal/praefect/service/info/consistencycheck.go +++ b/internal/praefect/service/info/consistencycheck.go @@ -4,6 +4,7 @@ import ( "context" "io" + "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -113,8 +114,6 @@ func walkRepos(ctx context.Context, walkerQ chan<- string, reference nodes.Node) } walkerQ <- resp.GetRelativePath() } - - return nil } func checksumRepo(ctx context.Context, relpath string, node nodes.Node) (string, error) { @@ -177,36 +176,27 @@ func checksumRepos(ctx context.Context, relpathQ <-chan string, checksumResultQ return nil } -func scheduleReplication(ctx context.Context, csr checksumResult, ds Datastore, resp *gitalypb.ConsistencyCheckResponse) error { - ids, err := ds.CreateReplicaReplJobs( - correlation.ExtractFromContext(ctx), - csr.relativePath, - csr.referenceStorage, - []string{csr.targetStorage}, - datastore.UpdateRepo, - nil, - ) +func scheduleReplication(ctx context.Context, csr checksumResult, q Queue, resp *gitalypb.ConsistencyCheckResponse) error { + event, err := q.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + Change: datastore.UpdateRepo, + RelativePath: csr.relativePath, + TargetNodeStorage: csr.targetStorage, + SourceNodeStorage: csr.referenceStorage, + }, + Meta: datastore.Params{metadatahandler.CorrelationIDKey: correlation.ExtractFromContext(ctx)}, + }) + if err != nil { return err } - if len(ids) != 1 { - return status.Errorf( - codes.Internal, - "datastore unexpectedly returned %d job IDs", - len(ids), - ) - } - resp.ReplJobId = ids[0] - - if err := ds.UpdateReplJobState(resp.ReplJobId, datastore.JobStateReady); err != nil { - return err - } + resp.ReplJobId = event.ID return nil } -func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResult, ds Datastore, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error { +func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResult, q Queue, stream gitalypb.PraefectInfoService_ConsistencyCheckServer) error { for csr := range checksumResultQ { select { case <-ctx.Done(): @@ -222,7 +212,7 @@ func ensureConsistency(ctx context.Context, checksumResultQ <-chan checksumResul } if csr.reference != csr.target { - if err := scheduleReplication(ctx, csr, ds, resp); err != nil { + if err := scheduleReplication(ctx, csr, q, resp); err != nil { return err } } @@ -260,7 +250,7 @@ func (s *Server) ConsistencyCheck(req *gitalypb.ConsistencyCheckRequest, stream return checksumRepos(ctx, walkerQ, checksumResultQ, target, reference) }) g.Go(func() error { - return ensureConsistency(ctx, checksumResultQ, s.datastore, stream) + return ensureConsistency(ctx, checksumResultQ, s.queue, stream) }) return g.Wait() diff --git a/internal/praefect/service/info/server.go b/internal/praefect/service/info/server.go index 31990a813..46702d80a 100644 --- a/internal/praefect/service/info/server.go +++ b/internal/praefect/service/info/server.go @@ -1,36 +1,37 @@ package info import ( + "context" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) -// Datastore is a subset of the datastore functionality needed by this service -type Datastore interface { - CreateReplicaReplJobs(correlationID, relativePath, primaryStorage string, secondaryStorages []string, change datastore.ChangeType, params datastore.Params) ([]uint64, error) - UpdateReplJobState(jobID uint64, newState datastore.JobState) error +// Queue is a subset of the datastore.ReplicationEventQueue functionality needed by this service +type Queue interface { + Enqueue(ctx context.Context, event datastore.ReplicationEvent) (datastore.ReplicationEvent, error) } -// compile time assertion that Datastore is satisfied by -// datastore.ReplJobsDatastore -var _ Datastore = (datastore.ReplJobsDatastore)(nil) +// compile time assertion that Queue is satisfied by +// datastore.ReplicationEventQueue +var _ Queue = (datastore.ReplicationEventQueue)(nil) // Server is a InfoService server type Server struct { gitalypb.UnimplementedPraefectInfoServiceServer - nodeMgr nodes.Manager - conf config.Config - datastore Datastore + nodeMgr nodes.Manager + conf config.Config + queue Queue } // NewServer creates a new instance of a grpc InfoServiceServer -func NewServer(nodeMgr nodes.Manager, conf config.Config, datastore Datastore) gitalypb.PraefectInfoServiceServer { +func NewServer(nodeMgr nodes.Manager, conf config.Config, queue Queue) gitalypb.PraefectInfoServiceServer { s := &Server{ - nodeMgr: nodeMgr, - conf: conf, - datastore: datastore, + nodeMgr: nodeMgr, + conf: conf, + queue: queue, } return s |