diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-05-18 19:01:11 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-05-18 19:01:11 +0300 |
commit | 9ac3d3e6dccbb33fd3c7fa18461022741f3f470e (patch) | |
tree | c70c7c68f6bf464f17241e80e113e84a61b32109 | |
parent | 88c81500d06b7e8cae5289b1a4cdbe24db59975a (diff) | |
parent | ffc3443f89eb37ae132289601971d71f0346e411 (diff) |
Merge branch 'ps-reads-distribution' into 'master'
Praefect: horizontal scaling of a single shard MVC
Closes #2650
See merge request gitlab-org/gitaly!2162
-rw-r--r-- | cmd/praefect/main.go | 19 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 2 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 3 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 7 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 41 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 24 | ||||
-rw-r--r-- | internal/praefect/dataloss_check_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/postgres.go | 25 | ||||
-rw-r--r-- | internal/praefect/datastore/glsql/testing.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/memory.go | 73 | ||||
-rw-r--r-- | internal/praefect/datastore/memory_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 28 | ||||
-rw-r--r-- | internal/praefect/datastore/queue_test.go | 203 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 13 | ||||
-rw-r--r-- | internal/praefect/metrics/prometheus.go | 12 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 47 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 230 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 16 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 25 |
20 files changed, 714 insertions, 77 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 19d5e25fa..7c42fb18a 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -231,7 +231,14 @@ func run(cfgs []starter.Config, conf config.Config) error { db = dbConn } - nodeManager, err := nodes.NewManager(logger, conf, db, nodeLatencyHistogram) + ds := datastore.Datastore{ReplicasDatastore: datastore.NewInMemory(conf)} + if conf.PostgresQueueEnabled { + ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db) + } else { + ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue(conf) + } + + nodeManager, err := nodes.NewManager(logger, conf, db, ds, nodeLatencyHistogram) if err != nil { return err } @@ -257,16 +264,6 @@ func run(cfgs []starter.Config, conf config.Config) error { return err } - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - } - - if conf.PostgresQueueEnabled { - ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db) - } else { - ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue() - } - var ( // top level server dependencies coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, registry) diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index a43e72e60..c64c9a468 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -171,10 +171,10 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func logEntry := testhelper.DiscardTestEntry(t) ds := datastore.Datastore{ ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), } - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 510eada28..6a0c842a6 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -36,6 +36,8 @@ type Config struct { // Keep for legacy reasons: remove after Omnibus has switched FailoverEnabled bool `toml:"failover_enabled"` PostgresQueueEnabled bool `toml:"postgres_queue_enabled"` + // DistributedReadsEnabled if enabled redirects accessor operations to up to date secondaries + DistributedReadsEnabled bool `toml:"distributed_reads_enabled"` } // VirtualStorage represents a set of nodes for a storage diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 0077e6094..4a9ae242a 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -257,7 +257,8 @@ func TestConfigParsing(t *testing.T) { SSLKey: "/path/to/key", SSLRootCert: "/path/to/root-cert", }, - PostgresQueueEnabled: true, + PostgresQueueEnabled: true, + DistributedReadsEnabled: true, }, }, } diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 330470c90..c871972a4 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -2,6 +2,7 @@ listen_addr = "" socket_path = "" prometheus_listen_addr = "" postgres_queue_enabled = true +distributed_reads_enabled = true [logging] format = "json" @@ -18,17 +19,17 @@ name = "praefect" address = "tcp://gitaly-internal-1.example.com" storage = "praefect-internal-1" primary = true - + [[virtual_storage.node]] address = "tcp://gitaly-internal-2.example.com" storage = "praefect-internal-2" - + [[virtual_storage.node]] address = "tcp://gitaly-internal-3.example.com" storage = "praefect-internal-3" [prometheus] - grpc_latency_buckets = [0.1, 0.2, 0.3] + grpc_latency_buckets = [0.1, 0.2, 0.3] [database] host = "1.2.3.4" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 977d66498..ea919d455 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -14,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" + "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" @@ -98,17 +99,25 @@ func NewCoordinator(l logrus.FieldLogger, ds datastore.Datastore, nodeMgr nodes. } func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { - ctx, err := metadata.InjectPraefectServer(ctx, c.conf) + targetRepo, err := call.methodInfo.TargetRepo(call.msg) if err != nil { + return nil, helper.ErrInvalidArgument(fmt.Errorf("repo scoped: %w", err)) + } + + if targetRepo.StorageName == "" || targetRepo.RelativePath == "" { + return nil, helper.ErrInvalidArgumentf("repo scoped: target repo is invalid") + } + + if ctx, err = metadata.InjectPraefectServer(ctx, c.conf); err != nil { return nil, fmt.Errorf("repo scoped: could not inject Praefect server: %w", err) } var ps *proxy.StreamParameters switch call.methodInfo.Operation { case protoregistry.OpAccessor: - ps, err = c.accessorStreamParameters(ctx, call) + ps, err = c.accessorStreamParameters(ctx, call, targetRepo) case protoregistry.OpMutator: - ps, err = c.mutatorStreamParameters(ctx, call) + ps, err = c.mutatorStreamParameters(ctx, call, targetRepo) default: err = fmt.Errorf("unknown operation type: %v", call.methodInfo.Operation) } @@ -120,23 +129,27 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr return ps, nil } -func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { - virtualStorage := call.targetRepo.StorageName +func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) { + repoPath := targetRepo.GetRelativePath() + virtualStorage := targetRepo.StorageName - shard, err := c.nodeMgr.GetShard(virtualStorage) + node, err := c.nodeMgr.GetSyncedNode(ctx, virtualStorage, repoPath) if err != nil { - return nil, fmt.Errorf("accessor call: get shard: %w", err) + return nil, fmt.Errorf("accessor call: get synced: %w", err) } - if err := c.rewriteStorageForRepositoryMessage(call.methodInfo, call.msg, call.peeker, shard.Primary.GetStorage()); err != nil { + storage := node.GetStorage() + if err := c.rewriteStorageForRepositoryMessage(call.methodInfo, call.msg, call.peeker, storage); err != nil { return nil, fmt.Errorf("accessor call: rewrite storage: %w", err) } - return proxy.NewStreamParameters(ctx, shard.Primary.GetConnection(), nil, nil), nil + metrics.ReadDistribution.WithLabelValues(virtualStorage, storage).Inc() + + return proxy.NewStreamParameters(ctx, node.GetConnection(), nil, nil), nil } -func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) { - virtualStorage := call.targetRepo.StorageName +func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) { + virtualStorage := targetRepo.StorageName shard, err := c.nodeMgr.GetShard(virtualStorage) if err != nil { @@ -217,14 +230,14 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string, return proxy.NewStreamParameters(ctx, shard.Primary.GetConnection(), func() {}, nil), nil } -func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, primaryStorage string) error { +func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, storage string) error { targetRepo, err := mi.TargetRepo(m) if err != nil { return helper.ErrInvalidArgument(err) } // rewrite storage name - targetRepo.StorageName = primaryStorage + targetRepo.StorageName = storage additionalRepo, ok, err := mi.AdditionalRepo(m) if err != nil { @@ -232,7 +245,7 @@ func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.Method } if ok { - additionalRepo.StorageName = primaryStorage + additionalRepo.StorageName = storage } b, err := proxy.NewCodec().Marshal(m) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index a713bb715..399a8d9ed 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -48,6 +48,10 @@ func (m *mockNodeManager) GetShard(storage string) (nodes.Shard, error) { func (m *mockNodeManager) EnableWrites(context.Context, string) error { panic("unimplemented") } +func (m *mockNodeManager) GetSyncedNode(context.Context, string, string) (nodes.Node, error) { + panic("unimplemented") +} + type mockNode struct { nodes.Node storageName string @@ -106,7 +110,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { const storageName = "test-storage" coordinator := NewCoordinator( testhelper.DiscardTestEntry(t), - datastore.Datastore{datastore.NewInMemory(conf), datastore.NewMemoryReplicationEventQueue()}, + datastore.Datastore{datastore.NewInMemory(conf), datastore.NewMemoryReplicationEventQueue(conf)}, &mockNodeManager{GetShardFunc: func(storage string) (nodes.Shard, error) { return nodes.Shard{ IsReadOnly: tc.readOnly, @@ -162,11 +166,12 @@ func TestStreamDirectorMutator(t *testing.T) { }}, }, }, + DistributedReadsEnabled: true, } var replEventWait sync.WaitGroup - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { defer replEventWait.Done() return queue.Enqueue(ctx, event) @@ -187,7 +192,7 @@ func TestStreamDirectorMutator(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) r := protoregistry.New() require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) @@ -280,11 +285,12 @@ func TestStreamDirectorAccessor(t *testing.T) { }}, }, }, + DistributedReadsEnabled: true, } ds := datastore.Datastore{ ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), } targetRepo := gitalypb.Repository{ @@ -297,7 +303,7 @@ func TestStreamDirectorAccessor(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) r := protoregistry.New() require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) @@ -314,7 +320,7 @@ func TestStreamDirectorAccessor(t *testing.T) { peeker := &mockPeeker{frame: frame} streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) require.NoError(t, err) - require.Equal(t, primaryAddress, streamParams.Conn().Target()) + require.Equal(t, secondaryAddress, streamParams.Conn().Target()) md, ok := metadata.FromOutgoingContext(streamParams.Context()) require.True(t, ok) @@ -328,7 +334,7 @@ func TestStreamDirectorAccessor(t *testing.T) { rewrittenTargetRepo, err := mi.TargetRepo(m) require.NoError(t, err) - require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") + require.Equal(t, "praefect-internal-2", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") // must be invoked without issues streamParams.RequestFinalizer() @@ -376,7 +382,7 @@ func TestAbsentCorrelationID(t *testing.T) { var replEventWait sync.WaitGroup - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { defer replEventWait.Done() return queue.Enqueue(ctx, event) @@ -397,7 +403,7 @@ func TestAbsentCorrelationID(t *testing.T) { entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) txMgr := transactions.NewManager() diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go index 9d714b235..bc705b632 100644 --- a/internal/praefect/dataloss_check_test.go +++ b/internal/praefect/dataloss_check_test.go @@ -33,7 +33,7 @@ func TestDatalossCheck(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - rq := datastore.NewMemoryReplicationEventQueue() + rq := datastore.NewMemoryReplicationEventQueue(cfg) const targetNode = "test-node" killJobs := func(t *testing.T) { t.Helper() diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go index cee10faa1..6e3346a24 100644 --- a/internal/praefect/datastore/glsql/postgres.go +++ b/internal/praefect/datastore/glsql/postgres.go @@ -199,7 +199,7 @@ func ScanAll(rows *sql.Rows, in DestProvider) (err error) { return err } -// Uint64Provider allows to use it with ScanAll function to read all rows into it and return result aa a slice. +// Uint64Provider allows to use it with ScanAll function to read all rows into it and return result as a slice. type Uint64Provider []*uint64 // Values returns list of values read from *sql.Rows @@ -221,3 +221,26 @@ func (p *Uint64Provider) To() []interface{} { *p = append(*p, &d) return []interface{}{&d} } + +// StringProvider allows ScanAll to read all rows and return the result as a slice. +type StringProvider []*string + +// Values returns list of values read from *sql.Rows +func (p *StringProvider) Values() []string { + if len(*p) == 0 { + return nil + } + + r := make([]string, len(*p)) + for i, v := range *p { + r[i] = *v + } + return r +} + +// To returns a list of pointers that will be used as a destination for scan operation. +func (p *StringProvider) To() []interface{} { + var d string + *p = append(*p, &d) + return []interface{}{&d} +} diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go index 91efa6286..9de73d7b0 100644 --- a/internal/praefect/datastore/glsql/testing.go +++ b/internal/praefect/datastore/glsql/testing.go @@ -41,6 +41,7 @@ func (db DB) Truncate(t testing.TB, tables ...string) { require.NoError(t, err, "database truncation failed: %s", tables) } +// RequireRowsInTable verifies that `tname` table has `n` amount of rows in it. func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { t.Helper() @@ -49,6 +50,7 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) { require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n) } +// TruncateAll removes all data from known set of tables. func (db DB) TruncateAll(t testing.TB) { db.Truncate(t, "replication_queue_job_lock", @@ -59,6 +61,12 @@ func (db DB) TruncateAll(t testing.TB) { ) } +// MustExec executes `q` with `args` and verifies there are no errors. +func (db DB) MustExec(t testing.TB, q string, args ...interface{}) { + _, err := db.DB.Exec(q, args...) + require.NoError(t, err) +} + // Close removes schema if it was used and releases connection pool. func (db DB) Close() error { if err := db.DB.Close(); err != nil { diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go index 313fddc97..3a095c9c8 100644 --- a/internal/praefect/datastore/memory.go +++ b/internal/praefect/datastore/memory.go @@ -6,6 +6,8 @@ import ( "fmt" "sync" "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" ) var ( @@ -13,10 +15,20 @@ var ( ) // NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue. -func NewMemoryReplicationEventQueue() ReplicationEventQueue { +func NewMemoryReplicationEventQueue(conf config.Config) ReplicationEventQueue { + storageNamesByVirtualStorage := make(map[string][]string, len(conf.VirtualStorages)) + for _, vs := range conf.VirtualStorages { + storages := make([]string, len(vs.Nodes)) + for i, node := range vs.Nodes { + storages[i] = node.Storage + } + storageNamesByVirtualStorage[vs.Name] = storages + } return &memoryReplicationEventQueue{ - dequeued: map[uint64]struct{}{}, - maxDeadJobs: 1000, + dequeued: map[uint64]struct{}{}, + maxDeadJobs: 1000, + storageNamesByVirtualStorage: storageNamesByVirtualStorage, + lastEventByDest: map[eventDestination]ReplicationEvent{}, } } @@ -25,14 +37,20 @@ type deadJob struct { relativePath string } +type eventDestination struct { + virtual, storage, relativePath string +} + // memoryReplicationEventQueue implements queue interface with in-memory implementation of storage type memoryReplicationEventQueue struct { sync.RWMutex - seq uint64 // used to generate unique identifiers for events - queued []ReplicationEvent // all new events stored as queue - dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged - maxDeadJobs int // maximum amount of dead jobs to hold in memory - deadJobs []deadJob // dead jobs stored for reporting purposes + seq uint64 // used to generate unique identifiers for events + queued []ReplicationEvent // all new events stored as queue + dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged + maxDeadJobs int // maximum amount of dead jobs to hold in memory + deadJobs []deadJob // dead jobs stored for reporting purposes + storageNamesByVirtualStorage map[string][]string // bindings between virtual storage and storages behind them + lastEventByDest map[eventDestination]ReplicationEvent // contains 'virtual+storage+repo' => 'last even' mappings } // nextID returns a new sequential ID for new events. @@ -49,11 +67,13 @@ func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event Replicati // event.LockID is unnecessary with an in memory data store as it is intended to synchronize multiple praefect instances // but must be filled out to produce same event as it done by SQL implementation event.LockID = event.Job.VirtualStorage + "|" + event.Job.TargetNodeStorage + "|" + event.Job.RelativePath + dest := s.defineDest(event) s.Lock() defer s.Unlock() event.ID = s.nextID() s.queued = append(s.queued, event) + s.lastEventByDest[dest] = event return event, nil } @@ -77,6 +97,10 @@ func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, virtualStorage, s.queued[i] = event s.dequeued[event.ID] = struct{}{} + eventDest := s.defineDest(event) + if last, found := s.lastEventByDest[eventDest]; found && last.ID == event.ID { + s.lastEventByDest[eventDest] = event + } result = append(result, event) if len(result) >= count { @@ -123,7 +147,10 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt updatedAt := time.Now().UTC() s.queued[i].State = state s.queued[i].UpdatedAt = &updatedAt - + eventDest := s.defineDest(s.queued[i]) + if last, found := s.lastEventByDest[eventDest]; found && last.ID == s.queued[i].ID { + s.lastEventByDest[eventDest] = s.queued[i] + } result = append(result, id) switch state { @@ -156,6 +183,30 @@ func (s *memoryReplicationEventQueue) CountDeadReplicationJobs(ctx context.Conte return dead, nil } +func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, virtualStorage, repoPath string) ([]string, error) { + s.RLock() + dirtyStorages := make(map[string]struct{}) + for dst, event := range s.lastEventByDest { + if dst.virtual == virtualStorage && dst.relativePath == repoPath && event.State != JobStateCompleted { + dirtyStorages[event.Job.TargetNodeStorage] = struct{}{} + } + } + s.RUnlock() + + storageNames, found := s.storageNamesByVirtualStorage[virtualStorage] + if !found { + return nil, nil + } + + var result []string + for _, storage := range storageNames { + if _, found := dirtyStorages[storage]; !found { + result = append(result, storage) + } + } + return result, nil +} + // remove deletes i-th element from the queue and from the in-flight tracking map. // It doesn't check 'i' for the out of range and must be called with lock protection. // If state is JobStateDead, the event will be added to the dead job tracker. @@ -175,6 +226,10 @@ func (s *memoryReplicationEventQueue) remove(i int, state JobState) { s.queued = append(s.queued[:i], s.queued[i+1:]...) } +func (s *memoryReplicationEventQueue) defineDest(event ReplicationEvent) eventDestination { + return eventDestination{virtual: event.Job.VirtualStorage, storage: event.Job.TargetNodeStorage, relativePath: event.Job.RelativePath} +} + // ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface. type ReplicationEventQueueInterceptor interface { // ReplicationEventQueue actual implementation. diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go index 96a6790c4..bd9991969 100644 --- a/internal/praefect/datastore/memory_test.go +++ b/internal/praefect/datastore/memory_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/testhelper" ) @@ -105,11 +106,11 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue) } func TestMemoryCountDeadReplicationJobs(t *testing.T) { - ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue()) + ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue(config.Config{})) } func TestMemoryCountDeadReplicationJobsLimit(t *testing.T) { - q := NewMemoryReplicationEventQueue().(*memoryReplicationEventQueue) + q := NewMemoryReplicationEventQueue(config.Config{}).(*memoryReplicationEventQueue) q.maxDeadJobs = 2 ctx, cancel := testhelper.Context() @@ -146,7 +147,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - queue := NewMemoryReplicationEventQueue() + queue := NewMemoryReplicationEventQueue(config.Config{}) noEvents, err := queue.Dequeue(ctx, "praefect", "storage-1", 100500) require.NoError(t, err) @@ -300,7 +301,7 @@ func TestMemoryReplicationEventQueue_ConcurrentAccess(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - queue := NewMemoryReplicationEventQueue() + queue := NewMemoryReplicationEventQueue(config.Config{}) job1 := ReplicationJob{ Change: UpdateRepo, diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index a3f0156b6..1785cc107 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -25,6 +25,9 @@ type ReplicationEventQueue interface { // CountDeadReplicationJobs returns the dead replication job counts by relative path within the // given timerange. The timerange beginning is inclusive and ending is exclusive. CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error) + // GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state. + // It returns no results if there is no up to date storages or there were no replication events yet. + GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) } func allowToAck(state JobState) error { @@ -341,3 +344,28 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J return acknowledged.Values(), nil } + +func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) { + query := ` + SELECT storage + FROM ( + SELECT DISTINCT ON (job ->> 'target_node_storage') + job ->> 'target_node_storage' AS storage, + state + FROM replication_queue + WHERE job ->> 'virtual_storage' = $1 AND job ->> 'relative_path' = $2 + ORDER BY job ->> 'target_node_storage', updated_at DESC NULLS FIRST + ) t + WHERE state = 'completed'` + rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, repoPath) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + + var storages glsql.StringProvider + if err := glsql.ScanAll(rows, &storages); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + + return storages.Values(), nil +} diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go index 62e969a21..75bbe608a 100644 --- a/internal/praefect/datastore/queue_test.go +++ b/internal/praefect/datastore/queue_test.go @@ -572,6 +572,209 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) { }) } +func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) { + db := getDB(t) + + ctx, cancel := testhelper.Context() + defer cancel() + + source := PostgresReplicationEventQueue{qc: db} + + t.Run("single 'ready' job for single storage", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{}, ss) + }) + + t.Run("single 'dead' job for single storage", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{}, ss) + }) + + t.Run("single 'failed' job for single storage", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{}, ss) + }) + + t.Run("single 'completed' job for single storage", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{"s1"}, ss) + }) + + t.Run("multiple 'completed' jobs for single storage but different repos", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{"s1"}, ss) + }) + + t.Run("last jobs are 'completed' for multiple storages", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{"s1", "s2"}, ss) + }) + + t.Run("last jobs are 'completed' for multiple storages but different virtuals", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + ('{"virtual_storage": "vs2", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{"s1"}, ss) + }) + + t.Run("lasts are in 'completed' and 'in_progress' for different storages", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{"s1"}, ss) + }) + + t.Run("lasts are in 'dead', 'ready', 'failed' and 'in_progress' for different storages", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'), + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready'), + ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{}, ss) + }) + + t.Run("last is not 'completed'", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'dead'), + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'ready'), + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'failed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'failed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{}, ss) + }) + + t.Run("multiple virtuals with multiple storages", func(t *testing.T) { + db.TruncateAll(t) + + db.MustExec(t, ` + INSERT INTO replication_queue + (job, updated_at, state) + VALUES + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'dead'), + ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'), + + ('{"virtual_storage": "vs2", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'), + ('{"virtual_storage": "vs2", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-2"}', '2020-01-01 00:00:01', 'failed'), + ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed'), + + ('{"virtual_storage": "vs1", "target_node_storage": "s5", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`, + ) + + ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1") + require.NoError(t, err) + require.ElementsMatch(t, []string{"s2"}, ss) + }) +} + func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) { t.Helper() diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index f82a67dee..3d6f18f65 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -111,6 +111,9 @@ type nullNodeMgr struct{} func (nullNodeMgr) GetShard(virtualStorageName string) (nodes.Shard, error) { return nodes.Shard{}, nil } func (nullNodeMgr) EnableWrites(ctx context.Context, virtualStorageName string) error { return nil } +func (nullNodeMgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (nodes.Node, error) { + return nil, nil +} type buildOptions struct { withDatastore datastore.Datastore @@ -180,7 +183,7 @@ func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhel func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { ds := datastore.Datastore{ ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), } return runPraefectServerWithGitalyWithDatastore(t, conf, ds) @@ -199,7 +202,7 @@ func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, func defaultDatastore(conf config.Config) datastore.Datastore { return datastore.Datastore{ ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), } } @@ -207,8 +210,8 @@ func defaultTxMgr() *transactions.Manager { return transactions.NewManager() } -func defaultNodeMgr(t testing.TB, conf config.Config) nodes.Manager { - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec()) +func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) nodes.Manager { + nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) return nodeMgr @@ -241,7 +244,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp opt.withLogger = log.Default() } if opt.withNodeMgr == nil { - opt.withNodeMgr = defaultNodeMgr(t, conf) + opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withDatastore) } coordinator := NewCoordinator( diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go index f49dfb9ba..87dee00ca 100644 --- a/internal/praefect/metrics/prometheus.go +++ b/internal/praefect/metrics/prometheus.go @@ -127,11 +127,23 @@ var ChecksumMismatchCounter = prometheus.NewCounterVec( }, []string{"target", "source"}, ) +// ReadDistribution counts how many read operations was routed to each storage. +var ReadDistribution = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "gitaly", + Subsystem: "praefect", + Name: "read_distribution", + Help: "Counts read operations directed to the storages", + }, + []string{"virtual_storage", "storage"}, +) + func init() { prometheus.MustRegister( MethodTypeCounter, PrimaryGauge, ChecksumMismatchCounter, NodeLastHealthcheckGauge, + ReadDistribution, ) } diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index f2c041158..f7e94f28f 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -5,14 +5,17 @@ import ( "database/sql" "errors" "fmt" + "math/rand" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/sirupsen/logrus" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" @@ -38,6 +41,9 @@ type Manager interface { // ErrVirtualStorageNotExist if a virtual storage with the given name // does not exist. EnableWrites(ctx context.Context, virtualStorageName string) error + // GetSyncedNode returns a random storage node based on the state of the replication. + // It returns primary in case there are no up to date secondaries or error occurs. + GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) } // Node represents some metadata of a node as well as a connection @@ -52,10 +58,14 @@ type Node interface { // Mgr is a concrete type that adheres to the Manager interface type Mgr struct { failoverEnabled bool - log *logrus.Entry + distributeReads bool + // log must be used only for non-request specific needs like bootstrapping, etc. + // for request related logging `ctxlogrus.Extract(ctx)` must be used. + log *logrus.Entry // strategies is a map of strategies keyed on virtual storage name strategies map[string]leaderElectionStrategy db *sql.DB + ds datastore.Datastore } // leaderElectionStrategy defines the interface by which primary and @@ -74,7 +84,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy") const dialTimeout = 10 * time.Second // NewManager creates a new NodeMgr based on virtual storage configs -func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { +func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) { strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages)) ctx, cancel := context.WithTimeout(context.Background(), dialTimeout) @@ -123,7 +133,9 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, latencyHistogram log: log, db: db, failoverEnabled: c.Failover.Enabled, + distributeReads: c.DistributedReadsEnabled, strategies: strategies, + ds: ds, }, nil } @@ -173,6 +185,37 @@ func (n *Mgr) EnableWrites(ctx context.Context, virtualStorageName string) error return strategy.enableWrites(ctx) } +func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) { + shard, err := n.GetShard(virtualStorageName) + if err != nil { + return nil, fmt.Errorf("get shard for %q: %w", virtualStorageName, err) + } + + var storages []string + if n.distributeReads { + if storages, err = n.ds.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil { + // this is recoverable error - proceed with primary node + ctxlogrus.Extract(ctx). + WithError(err). + WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath}). + Error("get up to date secondaries") + } + } + + if len(storages) == 0 { + return shard.Primary, nil + } + + secondary := storages[rand.Intn(len(storages))] // randomly pick up one of the synced storages + for _, node := range shard.Secondaries { + if node.GetStorage() == secondary { + return node, nil + } + } + + return shard.Primary, nil // there is no matched secondaries, maybe because of re-configuration +} + func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus { return &nodeStatus{ Node: node, diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index 669b163e9..ca2986122 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -2,16 +2,19 @@ package nodes import ( "context" + "errors" "net" "testing" "time" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "google.golang.org/grpc" + "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -82,7 +85,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) { Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"}, VirtualStorages: []*config.VirtualStorage{virtualStorage}, } - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec()) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec()) require.NoError(t, err) nm.Start(time.Millisecond, time.Millisecond) @@ -130,7 +133,7 @@ func TestPrimaryIsSecond(t *testing.T) { } mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, mockHistogram) require.NoError(t, err) shard, err := nm.GetShard("virtual-storage-0") @@ -180,7 +183,7 @@ func TestBlockingDial(t *testing.T) { healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) }() - mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec()) + mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec()) require.NoError(t, err) mgr.Start(1*time.Millisecond, 1*time.Millisecond) @@ -225,11 +228,13 @@ func TestNodeManager(t *testing.T) { Failover: config.Failover{Enabled: false}, } + ds := datastore.Datastore{} + mockHistogram := promtest.NewMockHistogramVec() - nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, mockHistogram) + nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, ds, mockHistogram) require.NoError(t, err) - nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, mockHistogram) + nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, ds, mockHistogram) require.NoError(t, err) nm.Start(1*time.Millisecond, 5*time.Second) @@ -312,3 +317,218 @@ func TestNodeManager(t *testing.T) { require.Equal(t, ErrPrimaryNotHealthy, nm.EnableWrites(context.Background(), "virtual-storage-0"), "should not be able to enable writes with unhealthy master") } + +func TestMgr_GetSyncedNode(t *testing.T) { + var sockets [4]string + var srvs [4]*grpc.Server + var healthSrvs [4]*health.Server + for i := 0; i < 4; i++ { + sockets[i] = testhelper.GetTemporaryGitalySocketFileName() + srvs[i], healthSrvs[i] = testhelper.NewServerWithHealth(t, sockets[i]) + defer srvs[i].Stop() + } + + vs0Primary := "unix://" + sockets[0] + vs1Secondary := "unix://" + sockets[3] + + virtualStorages := []*config.VirtualStorage{ + { + Name: "virtual-storage-0", + Nodes: []*models.Node{ + { + Storage: "gitaly-0", + Address: vs0Primary, + DefaultPrimary: true, + }, + { + Storage: "gitaly-1", + Address: "unix://" + sockets[1], + }, + }, + }, + { + // second virtual storage needed to check there is no intersections between two even with same storage names + Name: "virtual-storage-1", + Nodes: []*models.Node{ + { + // same storage name as in other virtual storage is used intentionally + Storage: "gitaly-1", + Address: "unix://" + sockets[2], + DefaultPrimary: true, + }, + { + Storage: "gitaly-2", + Address: vs1Secondary, + }, + }, + }, + } + + conf := config.Config{ + VirtualStorages: virtualStorages, + Failover: config.Failover{Enabled: true}, + DistributedReadsEnabled: true, + } + + mockHistogram := promtest.NewMockHistogramVec() + + ctx, cancel := testhelper.Context() + defer cancel() + + ackEvent := func(ds datastore.Datastore, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent { + event := datastore.ReplicationEvent{Job: job} + + eevent, err := ds.Enqueue(ctx, event) + require.NoError(t, err) + + devents, err := ds.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2) + require.NoError(t, err) + require.Len(t, devents, 1) + + acks, err := ds.Acknowledge(ctx, state, []uint64{devents[0].ID}) + require.NoError(t, err) + require.Equal(t, []uint64{devents[0].ID}, acks) + return devents[0] + } + + verify := func(scenario func(t *testing.T, nm Manager, ds datastore.Datastore)) func(*testing.T) { + ds := datastore.Datastore{ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf)} + + nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, mockHistogram) + require.NoError(t, err) + + nm.Start(time.Duration(0), time.Hour) + + return func(t *testing.T) { scenario(t, nm, ds) } + } + + t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + _, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "") + require.True(t, errors.Is(err, ErrVirtualStorageNotExist)) + })) + + t.Run("no replication events", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "no/matter") + require.NoError(t, err) + require.Contains(t, []string{vs0Primary, "unix://" + sockets[1]}, node.GetAddress()) + })) + + t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + _, err := ds.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-0", + }, + }) + require.NoError(t, err) + + node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "path/1") + require.NoError(t, err) + require.Equal(t, vs0Primary, node.GetAddress()) + })) + + t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + vs0Event, err := ds.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-0", + }, + }) + require.NoError(t, err) + + vs0Events, err := ds.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500) + require.NoError(t, err) + require.Len(t, vs0Events, 1) + + node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "path/1") + require.NoError(t, err) + require.Equal(t, vs0Primary, node.GetAddress()) + })) + + t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + vs0Event := ackEvent(ds, datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-0", + }, datastore.JobStateFailed) + + node, err := nm.GetSyncedNode(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.RelativePath) + require.NoError(t, err) + require.Equal(t, vs0Primary, node.GetAddress()) + })) + + t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + vsEvent0 := ackEvent(ds, datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-0", + }, datastore.JobStateCompleted) + + vsEvent1, err := ds.Enqueue(ctx, datastore.ReplicationEvent{ + Job: datastore.ReplicationJob{ + RelativePath: vsEvent0.Job.RelativePath, + TargetNodeStorage: vsEvent0.Job.TargetNodeStorage, + SourceNodeStorage: vsEvent0.Job.SourceNodeStorage, + VirtualStorage: vsEvent0.Job.VirtualStorage, + }, + }) + require.NoError(t, err) + + node, err := nm.GetSyncedNode(ctx, vsEvent1.Job.VirtualStorage, vsEvent1.Job.RelativePath) + require.NoError(t, err) + require.Equal(t, vs0Primary, node.GetAddress()) + })) + + t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + vs0Event := ackEvent(ds, datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-0", + }, datastore.JobStateDead) + + ackEvent(ds, datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "gitaly-1", + VirtualStorage: "virtual-storage-1", + }, datastore.JobStateCompleted) + + node, err := nm.GetSyncedNode(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.RelativePath) + require.NoError(t, err) + require.Equal(t, vs0Primary, node.GetAddress()) + })) + + t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) { + ackEvent(ds, datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-1", + SourceNodeStorage: "gitaly-0", + VirtualStorage: "virtual-storage-0", + }, datastore.JobStateCompleted) + + ackEvent(ds, datastore.ReplicationJob{ + RelativePath: "path/1", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "gitaly-1", + VirtualStorage: "virtual-storage-1", + }, datastore.JobStateCompleted) + + vs1Event := ackEvent(ds, datastore.ReplicationJob{ + RelativePath: "path/2", + TargetNodeStorage: "gitaly-2", + SourceNodeStorage: "gitaly-1", + VirtualStorage: "virtual-storage-1", + }, datastore.JobStateCompleted) + + node, err := nm.GetSyncedNode(ctx, vs1Event.Job.VirtualStorage, vs1Event.Job.RelativePath) + require.NoError(t, err) + require.Equal(t, vs1Secondary, node.GetAddress()) + })) +} diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index d496c4cd8..6aefbf14c 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -90,7 +90,7 @@ func TestProcessReplicationJob(t *testing.T) { ds := datastore.Datastore{ ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), } // create object pool on the source @@ -149,7 +149,7 @@ func TestProcessReplicationJob(t *testing.T) { entry := testhelper.DiscardTestEntry(t) replicator.log = entry - nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -217,11 +217,11 @@ func TestPropagateReplicationJob(t *testing.T) { ds := datastore.Datastore{ ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf), } logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) @@ -457,7 +457,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one") - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) processed := make(chan struct{}) dequeues := 0 @@ -520,7 +520,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) replMgr := NewReplMgr(logEntry, ds, nodeMgr) @@ -584,7 +584,7 @@ func TestProcessBacklog_Success(t *testing.T) { }) require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one") - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) processed := make(chan struct{}) queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { @@ -658,7 +658,7 @@ func TestProcessBacklog_Success(t *testing.T) { logEntry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec()) + nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec()) require.NoError(t, err) replMgr := NewReplMgr(logEntry, ds, nodeMgr) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index ab4610a1d..c9cd4fdce 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -433,7 +434,7 @@ func TestRepoRemoval(t *testing.T) { // TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports // graceful shutdown, we can remove this code that waits for jobs to be complete - queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()) + queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) ds := datastore.Datastore{ ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: queueInterceptor, @@ -524,6 +525,7 @@ func TestRepoRename(t *testing.T) { }, }, }, + DistributedReadsEnabled: true, } virtualStorage := conf.VirtualStorages[0] @@ -557,7 +559,21 @@ func TestRepoRename(t *testing.T) { _, path2, cleanup2 := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[2].Storage) defer cleanup2() - cc, _, cleanup := runPraefectServerWithGitaly(t, conf) + var canCheckRepo sync.WaitGroup + canCheckRepo.Add(2) + + evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf)) + evq.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) { + defer canCheckRepo.Done() + return queue.Acknowledge(ctx, state, ids) + }) + + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: evq, + } + + cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds) defer cleanup() ctx, cancel := testhelper.Context() @@ -597,6 +613,11 @@ func TestRepoRename(t *testing.T) { cpVirtualRepo := *virtualRepo renamedVirtualRepo := &cpVirtualRepo renamedVirtualRepo.RelativePath = newName + + // wait until replication jobs propagate changes to other storages + // as we don't know which one will be used to check because of read distribution + canCheckRepo.Wait() + resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ Repository: renamedVirtualRepo, }) |