Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-05-18 19:01:11 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-05-18 19:01:11 +0300
commit9ac3d3e6dccbb33fd3c7fa18461022741f3f470e (patch)
treec70c7c68f6bf464f17241e80e113e84a61b32109
parent88c81500d06b7e8cae5289b1a4cdbe24db59975a (diff)
parentffc3443f89eb37ae132289601971d71f0346e411 (diff)
Merge branch 'ps-reads-distribution' into 'master'
Praefect: horizontal scaling of a single shard MVC Closes #2650 See merge request gitlab-org/gitaly!2162
-rw-r--r--cmd/praefect/main.go19
-rw-r--r--internal/praefect/auth_test.go4
-rw-r--r--internal/praefect/config/config.go2
-rw-r--r--internal/praefect/config/config_test.go3
-rw-r--r--internal/praefect/config/testdata/config.toml7
-rw-r--r--internal/praefect/coordinator.go41
-rw-r--r--internal/praefect/coordinator_test.go24
-rw-r--r--internal/praefect/dataloss_check_test.go2
-rw-r--r--internal/praefect/datastore/glsql/postgres.go25
-rw-r--r--internal/praefect/datastore/glsql/testing.go8
-rw-r--r--internal/praefect/datastore/memory.go73
-rw-r--r--internal/praefect/datastore/memory_test.go9
-rw-r--r--internal/praefect/datastore/queue.go28
-rw-r--r--internal/praefect/datastore/queue_test.go203
-rw-r--r--internal/praefect/helper_test.go13
-rw-r--r--internal/praefect/metrics/prometheus.go12
-rw-r--r--internal/praefect/nodes/manager.go47
-rw-r--r--internal/praefect/nodes/manager_test.go230
-rw-r--r--internal/praefect/replicator_test.go16
-rw-r--r--internal/praefect/server_test.go25
20 files changed, 714 insertions, 77 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 19d5e25fa..7c42fb18a 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -231,7 +231,14 @@ func run(cfgs []starter.Config, conf config.Config) error {
db = dbConn
}
- nodeManager, err := nodes.NewManager(logger, conf, db, nodeLatencyHistogram)
+ ds := datastore.Datastore{ReplicasDatastore: datastore.NewInMemory(conf)}
+ if conf.PostgresQueueEnabled {
+ ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db)
+ } else {
+ ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue(conf)
+ }
+
+ nodeManager, err := nodes.NewManager(logger, conf, db, ds, nodeLatencyHistogram)
if err != nil {
return err
}
@@ -257,16 +264,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
return err
}
- ds := datastore.Datastore{
- ReplicasDatastore: datastore.NewInMemory(conf),
- }
-
- if conf.PostgresQueueEnabled {
- ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db)
- } else {
- ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue()
- }
-
var (
// top level server dependencies
coordinator = praefect.NewCoordinator(logger, ds, nodeManager, transactionManager, conf, registry)
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index a43e72e60..c64c9a468 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -171,10 +171,10 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
logEntry := testhelper.DiscardTestEntry(t)
ds := datastore.Datastore{
ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
}
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 510eada28..6a0c842a6 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -36,6 +36,8 @@ type Config struct {
// Keep for legacy reasons: remove after Omnibus has switched
FailoverEnabled bool `toml:"failover_enabled"`
PostgresQueueEnabled bool `toml:"postgres_queue_enabled"`
+ // DistributedReadsEnabled if enabled redirects accessor operations to up to date secondaries
+ DistributedReadsEnabled bool `toml:"distributed_reads_enabled"`
}
// VirtualStorage represents a set of nodes for a storage
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 0077e6094..4a9ae242a 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -257,7 +257,8 @@ func TestConfigParsing(t *testing.T) {
SSLKey: "/path/to/key",
SSLRootCert: "/path/to/root-cert",
},
- PostgresQueueEnabled: true,
+ PostgresQueueEnabled: true,
+ DistributedReadsEnabled: true,
},
},
}
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 330470c90..c871972a4 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -2,6 +2,7 @@ listen_addr = ""
socket_path = ""
prometheus_listen_addr = ""
postgres_queue_enabled = true
+distributed_reads_enabled = true
[logging]
format = "json"
@@ -18,17 +19,17 @@ name = "praefect"
address = "tcp://gitaly-internal-1.example.com"
storage = "praefect-internal-1"
primary = true
-
+
[[virtual_storage.node]]
address = "tcp://gitaly-internal-2.example.com"
storage = "praefect-internal-2"
-
+
[[virtual_storage.node]]
address = "tcp://gitaly-internal-3.example.com"
storage = "praefect-internal-3"
[prometheus]
- grpc_latency_buckets = [0.1, 0.2, 0.3]
+ grpc_latency_buckets = [0.1, 0.2, 0.3]
[database]
host = "1.2.3.4"
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 977d66498..ea919d455 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -14,6 +14,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metadata"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/praefect/transactions"
@@ -98,17 +99,25 @@ func NewCoordinator(l logrus.FieldLogger, ds datastore.Datastore, nodeMgr nodes.
}
func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
- ctx, err := metadata.InjectPraefectServer(ctx, c.conf)
+ targetRepo, err := call.methodInfo.TargetRepo(call.msg)
if err != nil {
+ return nil, helper.ErrInvalidArgument(fmt.Errorf("repo scoped: %w", err))
+ }
+
+ if targetRepo.StorageName == "" || targetRepo.RelativePath == "" {
+ return nil, helper.ErrInvalidArgumentf("repo scoped: target repo is invalid")
+ }
+
+ if ctx, err = metadata.InjectPraefectServer(ctx, c.conf); err != nil {
return nil, fmt.Errorf("repo scoped: could not inject Praefect server: %w", err)
}
var ps *proxy.StreamParameters
switch call.methodInfo.Operation {
case protoregistry.OpAccessor:
- ps, err = c.accessorStreamParameters(ctx, call)
+ ps, err = c.accessorStreamParameters(ctx, call, targetRepo)
case protoregistry.OpMutator:
- ps, err = c.mutatorStreamParameters(ctx, call)
+ ps, err = c.mutatorStreamParameters(ctx, call, targetRepo)
default:
err = fmt.Errorf("unknown operation type: %v", call.methodInfo.Operation)
}
@@ -120,23 +129,27 @@ func (c *Coordinator) directRepositoryScopedMessage(ctx context.Context, call gr
return ps, nil
}
-func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
- virtualStorage := call.targetRepo.StorageName
+func (c *Coordinator) accessorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) {
+ repoPath := targetRepo.GetRelativePath()
+ virtualStorage := targetRepo.StorageName
- shard, err := c.nodeMgr.GetShard(virtualStorage)
+ node, err := c.nodeMgr.GetSyncedNode(ctx, virtualStorage, repoPath)
if err != nil {
- return nil, fmt.Errorf("accessor call: get shard: %w", err)
+ return nil, fmt.Errorf("accessor call: get synced: %w", err)
}
- if err := c.rewriteStorageForRepositoryMessage(call.methodInfo, call.msg, call.peeker, shard.Primary.GetStorage()); err != nil {
+ storage := node.GetStorage()
+ if err := c.rewriteStorageForRepositoryMessage(call.methodInfo, call.msg, call.peeker, storage); err != nil {
return nil, fmt.Errorf("accessor call: rewrite storage: %w", err)
}
- return proxy.NewStreamParameters(ctx, shard.Primary.GetConnection(), nil, nil), nil
+ metrics.ReadDistribution.WithLabelValues(virtualStorage, storage).Inc()
+
+ return proxy.NewStreamParameters(ctx, node.GetConnection(), nil, nil), nil
}
-func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall) (*proxy.StreamParameters, error) {
- virtualStorage := call.targetRepo.StorageName
+func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall, targetRepo *gitalypb.Repository) (*proxy.StreamParameters, error) {
+ virtualStorage := targetRepo.StorageName
shard, err := c.nodeMgr.GetShard(virtualStorage)
if err != nil {
@@ -217,14 +230,14 @@ func (c *Coordinator) StreamDirector(ctx context.Context, fullMethodName string,
return proxy.NewStreamParameters(ctx, shard.Primary.GetConnection(), func() {}, nil), nil
}
-func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, primaryStorage string) error {
+func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier, storage string) error {
targetRepo, err := mi.TargetRepo(m)
if err != nil {
return helper.ErrInvalidArgument(err)
}
// rewrite storage name
- targetRepo.StorageName = primaryStorage
+ targetRepo.StorageName = storage
additionalRepo, ok, err := mi.AdditionalRepo(m)
if err != nil {
@@ -232,7 +245,7 @@ func (c *Coordinator) rewriteStorageForRepositoryMessage(mi protoregistry.Method
}
if ok {
- additionalRepo.StorageName = primaryStorage
+ additionalRepo.StorageName = storage
}
b, err := proxy.NewCodec().Marshal(m)
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index a713bb715..399a8d9ed 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -48,6 +48,10 @@ func (m *mockNodeManager) GetShard(storage string) (nodes.Shard, error) {
func (m *mockNodeManager) EnableWrites(context.Context, string) error { panic("unimplemented") }
+func (m *mockNodeManager) GetSyncedNode(context.Context, string, string) (nodes.Node, error) {
+ panic("unimplemented")
+}
+
type mockNode struct {
nodes.Node
storageName string
@@ -106,7 +110,7 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
const storageName = "test-storage"
coordinator := NewCoordinator(
testhelper.DiscardTestEntry(t),
- datastore.Datastore{datastore.NewInMemory(conf), datastore.NewMemoryReplicationEventQueue()},
+ datastore.Datastore{datastore.NewInMemory(conf), datastore.NewMemoryReplicationEventQueue(conf)},
&mockNodeManager{GetShardFunc: func(storage string) (nodes.Shard, error) {
return nodes.Shard{
IsReadOnly: tc.readOnly,
@@ -162,11 +166,12 @@ func TestStreamDirectorMutator(t *testing.T) {
}},
},
},
+ DistributedReadsEnabled: true,
}
var replEventWait sync.WaitGroup
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
defer replEventWait.Done()
return queue.Enqueue(ctx, event)
@@ -187,7 +192,7 @@ func TestStreamDirectorMutator(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
r := protoregistry.New()
require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
@@ -280,11 +285,12 @@ func TestStreamDirectorAccessor(t *testing.T) {
}},
},
},
+ DistributedReadsEnabled: true,
}
ds := datastore.Datastore{
ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
}
targetRepo := gitalypb.Repository{
@@ -297,7 +303,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
r := protoregistry.New()
require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...))
@@ -314,7 +320,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
peeker := &mockPeeker{frame: frame}
streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
require.NoError(t, err)
- require.Equal(t, primaryAddress, streamParams.Conn().Target())
+ require.Equal(t, secondaryAddress, streamParams.Conn().Target())
md, ok := metadata.FromOutgoingContext(streamParams.Context())
require.True(t, ok)
@@ -328,7 +334,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
rewrittenTargetRepo, err := mi.TargetRepo(m)
require.NoError(t, err)
- require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
+ require.Equal(t, "praefect-internal-2", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
// must be invoked without issues
streamParams.RequestFinalizer()
@@ -376,7 +382,7 @@ func TestAbsentCorrelationID(t *testing.T) {
var replEventWait sync.WaitGroup
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
defer replEventWait.Done()
return queue.Enqueue(ctx, event)
@@ -397,7 +403,7 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
txMgr := transactions.NewManager()
diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go
index 9d714b235..bc705b632 100644
--- a/internal/praefect/dataloss_check_test.go
+++ b/internal/praefect/dataloss_check_test.go
@@ -33,7 +33,7 @@ func TestDatalossCheck(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- rq := datastore.NewMemoryReplicationEventQueue()
+ rq := datastore.NewMemoryReplicationEventQueue(cfg)
const targetNode = "test-node"
killJobs := func(t *testing.T) {
t.Helper()
diff --git a/internal/praefect/datastore/glsql/postgres.go b/internal/praefect/datastore/glsql/postgres.go
index cee10faa1..6e3346a24 100644
--- a/internal/praefect/datastore/glsql/postgres.go
+++ b/internal/praefect/datastore/glsql/postgres.go
@@ -199,7 +199,7 @@ func ScanAll(rows *sql.Rows, in DestProvider) (err error) {
return err
}
-// Uint64Provider allows to use it with ScanAll function to read all rows into it and return result aa a slice.
+// Uint64Provider allows to use it with ScanAll function to read all rows into it and return result as a slice.
type Uint64Provider []*uint64
// Values returns list of values read from *sql.Rows
@@ -221,3 +221,26 @@ func (p *Uint64Provider) To() []interface{} {
*p = append(*p, &d)
return []interface{}{&d}
}
+
+// StringProvider allows ScanAll to read all rows and return the result as a slice.
+type StringProvider []*string
+
+// Values returns list of values read from *sql.Rows
+func (p *StringProvider) Values() []string {
+ if len(*p) == 0 {
+ return nil
+ }
+
+ r := make([]string, len(*p))
+ for i, v := range *p {
+ r[i] = *v
+ }
+ return r
+}
+
+// To returns a list of pointers that will be used as a destination for scan operation.
+func (p *StringProvider) To() []interface{} {
+ var d string
+ *p = append(*p, &d)
+ return []interface{}{&d}
+}
diff --git a/internal/praefect/datastore/glsql/testing.go b/internal/praefect/datastore/glsql/testing.go
index 91efa6286..9de73d7b0 100644
--- a/internal/praefect/datastore/glsql/testing.go
+++ b/internal/praefect/datastore/glsql/testing.go
@@ -41,6 +41,7 @@ func (db DB) Truncate(t testing.TB, tables ...string) {
require.NoError(t, err, "database truncation failed: %s", tables)
}
+// RequireRowsInTable verifies that `tname` table has `n` amount of rows in it.
func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
t.Helper()
@@ -49,6 +50,7 @@ func (db DB) RequireRowsInTable(t *testing.T, tname string, n int) {
require.Equal(t, n, count, "unexpected amount of rows in table: %d instead of %d", count, n)
}
+// TruncateAll removes all data from known set of tables.
func (db DB) TruncateAll(t testing.TB) {
db.Truncate(t,
"replication_queue_job_lock",
@@ -59,6 +61,12 @@ func (db DB) TruncateAll(t testing.TB) {
)
}
+// MustExec executes `q` with `args` and verifies there are no errors.
+func (db DB) MustExec(t testing.TB, q string, args ...interface{}) {
+ _, err := db.DB.Exec(q, args...)
+ require.NoError(t, err)
+}
+
// Close removes schema if it was used and releases connection pool.
func (db DB) Close() error {
if err := db.DB.Close(); err != nil {
diff --git a/internal/praefect/datastore/memory.go b/internal/praefect/datastore/memory.go
index 313fddc97..3a095c9c8 100644
--- a/internal/praefect/datastore/memory.go
+++ b/internal/praefect/datastore/memory.go
@@ -6,6 +6,8 @@ import (
"fmt"
"sync"
"time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
)
var (
@@ -13,10 +15,20 @@ var (
)
// NewMemoryReplicationEventQueue return in-memory implementation of the ReplicationEventQueue.
-func NewMemoryReplicationEventQueue() ReplicationEventQueue {
+func NewMemoryReplicationEventQueue(conf config.Config) ReplicationEventQueue {
+ storageNamesByVirtualStorage := make(map[string][]string, len(conf.VirtualStorages))
+ for _, vs := range conf.VirtualStorages {
+ storages := make([]string, len(vs.Nodes))
+ for i, node := range vs.Nodes {
+ storages[i] = node.Storage
+ }
+ storageNamesByVirtualStorage[vs.Name] = storages
+ }
return &memoryReplicationEventQueue{
- dequeued: map[uint64]struct{}{},
- maxDeadJobs: 1000,
+ dequeued: map[uint64]struct{}{},
+ maxDeadJobs: 1000,
+ storageNamesByVirtualStorage: storageNamesByVirtualStorage,
+ lastEventByDest: map[eventDestination]ReplicationEvent{},
}
}
@@ -25,14 +37,20 @@ type deadJob struct {
relativePath string
}
+type eventDestination struct {
+ virtual, storage, relativePath string
+}
+
// memoryReplicationEventQueue implements queue interface with in-memory implementation of storage
type memoryReplicationEventQueue struct {
sync.RWMutex
- seq uint64 // used to generate unique identifiers for events
- queued []ReplicationEvent // all new events stored as queue
- dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged
- maxDeadJobs int // maximum amount of dead jobs to hold in memory
- deadJobs []deadJob // dead jobs stored for reporting purposes
+ seq uint64 // used to generate unique identifiers for events
+ queued []ReplicationEvent // all new events stored as queue
+ dequeued map[uint64]struct{} // all events dequeued, but not yet acknowledged
+ maxDeadJobs int // maximum amount of dead jobs to hold in memory
+ deadJobs []deadJob // dead jobs stored for reporting purposes
+ storageNamesByVirtualStorage map[string][]string // bindings between virtual storage and storages behind them
+ lastEventByDest map[eventDestination]ReplicationEvent // contains 'virtual+storage+repo' => 'last even' mappings
}
// nextID returns a new sequential ID for new events.
@@ -49,11 +67,13 @@ func (s *memoryReplicationEventQueue) Enqueue(_ context.Context, event Replicati
// event.LockID is unnecessary with an in memory data store as it is intended to synchronize multiple praefect instances
// but must be filled out to produce same event as it done by SQL implementation
event.LockID = event.Job.VirtualStorage + "|" + event.Job.TargetNodeStorage + "|" + event.Job.RelativePath
+ dest := s.defineDest(event)
s.Lock()
defer s.Unlock()
event.ID = s.nextID()
s.queued = append(s.queued, event)
+ s.lastEventByDest[dest] = event
return event, nil
}
@@ -77,6 +97,10 @@ func (s *memoryReplicationEventQueue) Dequeue(_ context.Context, virtualStorage,
s.queued[i] = event
s.dequeued[event.ID] = struct{}{}
+ eventDest := s.defineDest(event)
+ if last, found := s.lastEventByDest[eventDest]; found && last.ID == event.ID {
+ s.lastEventByDest[eventDest] = event
+ }
result = append(result, event)
if len(result) >= count {
@@ -123,7 +147,10 @@ func (s *memoryReplicationEventQueue) Acknowledge(_ context.Context, state JobSt
updatedAt := time.Now().UTC()
s.queued[i].State = state
s.queued[i].UpdatedAt = &updatedAt
-
+ eventDest := s.defineDest(s.queued[i])
+ if last, found := s.lastEventByDest[eventDest]; found && last.ID == s.queued[i].ID {
+ s.lastEventByDest[eventDest] = s.queued[i]
+ }
result = append(result, id)
switch state {
@@ -156,6 +183,30 @@ func (s *memoryReplicationEventQueue) CountDeadReplicationJobs(ctx context.Conte
return dead, nil
}
+func (s *memoryReplicationEventQueue) GetUpToDateStorages(_ context.Context, virtualStorage, repoPath string) ([]string, error) {
+ s.RLock()
+ dirtyStorages := make(map[string]struct{})
+ for dst, event := range s.lastEventByDest {
+ if dst.virtual == virtualStorage && dst.relativePath == repoPath && event.State != JobStateCompleted {
+ dirtyStorages[event.Job.TargetNodeStorage] = struct{}{}
+ }
+ }
+ s.RUnlock()
+
+ storageNames, found := s.storageNamesByVirtualStorage[virtualStorage]
+ if !found {
+ return nil, nil
+ }
+
+ var result []string
+ for _, storage := range storageNames {
+ if _, found := dirtyStorages[storage]; !found {
+ result = append(result, storage)
+ }
+ }
+ return result, nil
+}
+
// remove deletes i-th element from the queue and from the in-flight tracking map.
// It doesn't check 'i' for the out of range and must be called with lock protection.
// If state is JobStateDead, the event will be added to the dead job tracker.
@@ -175,6 +226,10 @@ func (s *memoryReplicationEventQueue) remove(i int, state JobState) {
s.queued = append(s.queued[:i], s.queued[i+1:]...)
}
+func (s *memoryReplicationEventQueue) defineDest(event ReplicationEvent) eventDestination {
+ return eventDestination{virtual: event.Job.VirtualStorage, storage: event.Job.TargetNodeStorage, relativePath: event.Job.RelativePath}
+}
+
// ReplicationEventQueueInterceptor allows to register interceptors for `ReplicationEventQueue` interface.
type ReplicationEventQueueInterceptor interface {
// ReplicationEventQueue actual implementation.
diff --git a/internal/praefect/datastore/memory_test.go b/internal/praefect/datastore/memory_test.go
index 96a6790c4..bd9991969 100644
--- a/internal/praefect/datastore/memory_test.go
+++ b/internal/praefect/datastore/memory_test.go
@@ -7,6 +7,7 @@ import (
"time"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
)
@@ -105,11 +106,11 @@ func ContractTestCountDeadReplicationJobs(t *testing.T, q ReplicationEventQueue)
}
func TestMemoryCountDeadReplicationJobs(t *testing.T) {
- ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue())
+ ContractTestCountDeadReplicationJobs(t, NewMemoryReplicationEventQueue(config.Config{}))
}
func TestMemoryCountDeadReplicationJobsLimit(t *testing.T) {
- q := NewMemoryReplicationEventQueue().(*memoryReplicationEventQueue)
+ q := NewMemoryReplicationEventQueue(config.Config{}).(*memoryReplicationEventQueue)
q.maxDeadJobs = 2
ctx, cancel := testhelper.Context()
@@ -146,7 +147,7 @@ func TestMemoryReplicationEventQueue(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- queue := NewMemoryReplicationEventQueue()
+ queue := NewMemoryReplicationEventQueue(config.Config{})
noEvents, err := queue.Dequeue(ctx, "praefect", "storage-1", 100500)
require.NoError(t, err)
@@ -300,7 +301,7 @@ func TestMemoryReplicationEventQueue_ConcurrentAccess(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- queue := NewMemoryReplicationEventQueue()
+ queue := NewMemoryReplicationEventQueue(config.Config{})
job1 := ReplicationJob{
Change: UpdateRepo,
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index a3f0156b6..1785cc107 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -25,6 +25,9 @@ type ReplicationEventQueue interface {
// CountDeadReplicationJobs returns the dead replication job counts by relative path within the
// given timerange. The timerange beginning is inclusive and ending is exclusive.
CountDeadReplicationJobs(ctx context.Context, from, to time.Time) (map[string]int64, error)
+ // GetUpToDateStorages returns list of target storages where latest replication job is in 'completed' state.
+ // It returns no results if there is no up to date storages or there were no replication events yet.
+ GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error)
}
func allowToAck(state JobState) error {
@@ -341,3 +344,28 @@ func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state J
return acknowledged.Values(), nil
}
+
+func (rq PostgresReplicationEventQueue) GetUpToDateStorages(ctx context.Context, virtualStorage, repoPath string) ([]string, error) {
+ query := `
+ SELECT storage
+ FROM (
+ SELECT DISTINCT ON (job ->> 'target_node_storage')
+ job ->> 'target_node_storage' AS storage,
+ state
+ FROM replication_queue
+ WHERE job ->> 'virtual_storage' = $1 AND job ->> 'relative_path' = $2
+ ORDER BY job ->> 'target_node_storage', updated_at DESC NULLS FIRST
+ ) t
+ WHERE state = 'completed'`
+ rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, repoPath)
+ if err != nil {
+ return nil, fmt.Errorf("query: %w", err)
+ }
+
+ var storages glsql.StringProvider
+ if err := glsql.ScanAll(rows, &storages); err != nil {
+ return nil, fmt.Errorf("scan: %w", err)
+ }
+
+ return storages.Values(), nil
+}
diff --git a/internal/praefect/datastore/queue_test.go b/internal/praefect/datastore/queue_test.go
index 62e969a21..75bbe608a 100644
--- a/internal/praefect/datastore/queue_test.go
+++ b/internal/praefect/datastore/queue_test.go
@@ -572,6 +572,209 @@ func TestPostgresReplicationEventQueue_AcknowledgeMultiple(t *testing.T) {
})
}
+func TestPostgresReplicationEventQueue_GetUpToDateStorages(t *testing.T) {
+ db := getDB(t)
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ source := PostgresReplicationEventQueue{qc: db}
+
+ t.Run("single 'ready' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("single 'dead' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("single 'failed' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("single 'completed' job for single storage", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("multiple 'completed' jobs for single storage but different repos", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("last jobs are 'completed' for multiple storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1", "s2"}, ss)
+ })
+
+ t.Run("last jobs are 'completed' for multiple storages but different virtuals", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs2", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("lasts are in 'completed' and 'in_progress' for different storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s1"}, ss)
+ })
+
+ t.Run("lasts are in 'dead', 'ready', 'failed' and 'in_progress' for different storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'ready'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'in_progress')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("last is not 'completed'", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'dead'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'ready'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{}, ss)
+ })
+
+ t.Run("multiple virtuals with multiple storages", func(t *testing.T) {
+ db.TruncateAll(t)
+
+ db.MustExec(t, `
+ INSERT INTO replication_queue
+ (job, updated_at, state)
+ VALUES
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'dead'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s1", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s2", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'dead'),
+
+ ('{"virtual_storage": "vs2", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:01', 'completed'),
+ ('{"virtual_storage": "vs2", "target_node_storage": "s3", "relative_path": "path-1"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-2"}', '2020-01-01 00:00:01', 'failed'),
+ ('{"virtual_storage": "vs1", "target_node_storage": "s4", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed'),
+
+ ('{"virtual_storage": "vs1", "target_node_storage": "s5", "relative_path": "path-2"}', '2020-01-01 00:00:00', 'completed')`,
+ )
+
+ ss, err := source.GetUpToDateStorages(ctx, "vs1", "path-1")
+ require.NoError(t, err)
+ require.ElementsMatch(t, []string{"s2"}, ss)
+ })
+}
+
func requireEvents(t *testing.T, ctx context.Context, db glsql.DB, expected []ReplicationEvent) {
t.Helper()
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index f82a67dee..3d6f18f65 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -111,6 +111,9 @@ type nullNodeMgr struct{}
func (nullNodeMgr) GetShard(virtualStorageName string) (nodes.Shard, error) { return nodes.Shard{}, nil }
func (nullNodeMgr) EnableWrites(ctx context.Context, virtualStorageName string) error { return nil }
+func (nullNodeMgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (nodes.Node, error) {
+ return nil, nil
+}
type buildOptions struct {
withDatastore datastore.Datastore
@@ -180,7 +183,7 @@ func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhel
func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) {
ds := datastore.Datastore{
ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
}
return runPraefectServerWithGitalyWithDatastore(t, conf, ds)
@@ -199,7 +202,7 @@ func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config,
func defaultDatastore(conf config.Config) datastore.Datastore {
return datastore.Datastore{
ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
}
}
@@ -207,8 +210,8 @@ func defaultTxMgr() *transactions.Manager {
return transactions.NewManager()
}
-func defaultNodeMgr(t testing.TB, conf config.Config) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec())
+func defaultNodeMgr(t testing.TB, conf config.Config, ds datastore.Datastore) nodes.Manager {
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
return nodeMgr
@@ -241,7 +244,7 @@ func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grp
opt.withLogger = log.Default()
}
if opt.withNodeMgr == nil {
- opt.withNodeMgr = defaultNodeMgr(t, conf)
+ opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withDatastore)
}
coordinator := NewCoordinator(
diff --git a/internal/praefect/metrics/prometheus.go b/internal/praefect/metrics/prometheus.go
index f49dfb9ba..87dee00ca 100644
--- a/internal/praefect/metrics/prometheus.go
+++ b/internal/praefect/metrics/prometheus.go
@@ -127,11 +127,23 @@ var ChecksumMismatchCounter = prometheus.NewCounterVec(
}, []string{"target", "source"},
)
+// ReadDistribution counts how many read operations was routed to each storage.
+var ReadDistribution = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "gitaly",
+ Subsystem: "praefect",
+ Name: "read_distribution",
+ Help: "Counts read operations directed to the storages",
+ },
+ []string{"virtual_storage", "storage"},
+)
+
func init() {
prometheus.MustRegister(
MethodTypeCounter,
PrimaryGauge,
ChecksumMismatchCounter,
NodeLastHealthcheckGauge,
+ ReadDistribution,
)
}
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index f2c041158..f7e94f28f 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -5,14 +5,17 @@ import (
"database/sql"
"errors"
"fmt"
+ "math/rand"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/sirupsen/logrus"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
@@ -38,6 +41,9 @@ type Manager interface {
// ErrVirtualStorageNotExist if a virtual storage with the given name
// does not exist.
EnableWrites(ctx context.Context, virtualStorageName string) error
+ // GetSyncedNode returns a random storage node based on the state of the replication.
+ // It returns primary in case there are no up to date secondaries or error occurs.
+ GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error)
}
// Node represents some metadata of a node as well as a connection
@@ -52,10 +58,14 @@ type Node interface {
// Mgr is a concrete type that adheres to the Manager interface
type Mgr struct {
failoverEnabled bool
- log *logrus.Entry
+ distributeReads bool
+ // log must be used only for non-request specific needs like bootstrapping, etc.
+ // for request related logging `ctxlogrus.Extract(ctx)` must be used.
+ log *logrus.Entry
// strategies is a map of strategies keyed on virtual storage name
strategies map[string]leaderElectionStrategy
db *sql.DB
+ ds datastore.Datastore
}
// leaderElectionStrategy defines the interface by which primary and
@@ -74,7 +84,7 @@ var ErrPrimaryNotHealthy = errors.New("primary is not healthy")
const dialTimeout = 10 * time.Second
// NewManager creates a new NodeMgr based on virtual storage configs
-func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
+func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, ds datastore.Datastore, latencyHistogram prommetrics.HistogramVec, dialOpts ...grpc.DialOption) (*Mgr, error) {
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
@@ -123,7 +133,9 @@ func NewManager(log *logrus.Entry, c config.Config, db *sql.DB, latencyHistogram
log: log,
db: db,
failoverEnabled: c.Failover.Enabled,
+ distributeReads: c.DistributedReadsEnabled,
strategies: strategies,
+ ds: ds,
}, nil
}
@@ -173,6 +185,37 @@ func (n *Mgr) EnableWrites(ctx context.Context, virtualStorageName string) error
return strategy.enableWrites(ctx)
}
+func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) {
+ shard, err := n.GetShard(virtualStorageName)
+ if err != nil {
+ return nil, fmt.Errorf("get shard for %q: %w", virtualStorageName, err)
+ }
+
+ var storages []string
+ if n.distributeReads {
+ if storages, err = n.ds.GetUpToDateStorages(ctx, virtualStorageName, repoPath); err != nil {
+ // this is recoverable error - proceed with primary node
+ ctxlogrus.Extract(ctx).
+ WithError(err).
+ WithFields(logrus.Fields{"virtual_storage_name": virtualStorageName, "repo_path": repoPath}).
+ Error("get up to date secondaries")
+ }
+ }
+
+ if len(storages) == 0 {
+ return shard.Primary, nil
+ }
+
+ secondary := storages[rand.Intn(len(storages))] // randomly pick up one of the synced storages
+ for _, node := range shard.Secondaries {
+ if node.GetStorage() == secondary {
+ return node, nil
+ }
+ }
+
+ return shard.Primary, nil // there is no matched secondaries, maybe because of re-configuration
+}
+
func newConnectionStatus(node models.Node, cc *grpc.ClientConn, l *logrus.Entry, latencyHist prommetrics.HistogramVec) *nodeStatus {
return &nodeStatus{
Node: node,
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 669b163e9..ca2986122 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -2,16 +2,19 @@ package nodes
import (
"context"
+ "errors"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest"
"google.golang.org/grpc"
+ "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
@@ -82,7 +85,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
Failover: config.Failover{Enabled: false, ElectionStrategy: "sql"},
VirtualStorages: []*config.VirtualStorage{virtualStorage},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec())
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec())
require.NoError(t, err)
nm.Start(time.Millisecond, time.Millisecond)
@@ -130,7 +133,7 @@ func TestPrimaryIsSecond(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, mockHistogram)
require.NoError(t, err)
shard, err := nm.GetShard("virtual-storage-0")
@@ -180,7 +183,7 @@ func TestBlockingDial(t *testing.T) {
healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
}()
- mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec())
+ mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, datastore.Datastore{}, promtest.NewMockHistogramVec())
require.NoError(t, err)
mgr.Start(1*time.Millisecond, 1*time.Millisecond)
@@ -225,11 +228,13 @@ func TestNodeManager(t *testing.T) {
Failover: config.Failover{Enabled: false},
}
+ ds := datastore.Datastore{}
+
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, mockHistogram)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, ds, mockHistogram)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, mockHistogram)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, ds, mockHistogram)
require.NoError(t, err)
nm.Start(1*time.Millisecond, 5*time.Second)
@@ -312,3 +317,218 @@ func TestNodeManager(t *testing.T) {
require.Equal(t, ErrPrimaryNotHealthy, nm.EnableWrites(context.Background(), "virtual-storage-0"),
"should not be able to enable writes with unhealthy master")
}
+
+func TestMgr_GetSyncedNode(t *testing.T) {
+ var sockets [4]string
+ var srvs [4]*grpc.Server
+ var healthSrvs [4]*health.Server
+ for i := 0; i < 4; i++ {
+ sockets[i] = testhelper.GetTemporaryGitalySocketFileName()
+ srvs[i], healthSrvs[i] = testhelper.NewServerWithHealth(t, sockets[i])
+ defer srvs[i].Stop()
+ }
+
+ vs0Primary := "unix://" + sockets[0]
+ vs1Secondary := "unix://" + sockets[3]
+
+ virtualStorages := []*config.VirtualStorage{
+ {
+ Name: "virtual-storage-0",
+ Nodes: []*models.Node{
+ {
+ Storage: "gitaly-0",
+ Address: vs0Primary,
+ DefaultPrimary: true,
+ },
+ {
+ Storage: "gitaly-1",
+ Address: "unix://" + sockets[1],
+ },
+ },
+ },
+ {
+ // second virtual storage needed to check there is no intersections between two even with same storage names
+ Name: "virtual-storage-1",
+ Nodes: []*models.Node{
+ {
+ // same storage name as in other virtual storage is used intentionally
+ Storage: "gitaly-1",
+ Address: "unix://" + sockets[2],
+ DefaultPrimary: true,
+ },
+ {
+ Storage: "gitaly-2",
+ Address: vs1Secondary,
+ },
+ },
+ },
+ }
+
+ conf := config.Config{
+ VirtualStorages: virtualStorages,
+ Failover: config.Failover{Enabled: true},
+ DistributedReadsEnabled: true,
+ }
+
+ mockHistogram := promtest.NewMockHistogramVec()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ ackEvent := func(ds datastore.Datastore, job datastore.ReplicationJob, state datastore.JobState) datastore.ReplicationEvent {
+ event := datastore.ReplicationEvent{Job: job}
+
+ eevent, err := ds.Enqueue(ctx, event)
+ require.NoError(t, err)
+
+ devents, err := ds.Dequeue(ctx, eevent.Job.VirtualStorage, eevent.Job.TargetNodeStorage, 2)
+ require.NoError(t, err)
+ require.Len(t, devents, 1)
+
+ acks, err := ds.Acknowledge(ctx, state, []uint64{devents[0].ID})
+ require.NoError(t, err)
+ require.Equal(t, []uint64{devents[0].ID}, acks)
+ return devents[0]
+ }
+
+ verify := func(scenario func(t *testing.T, nm Manager, ds datastore.Datastore)) func(*testing.T) {
+ ds := datastore.Datastore{ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf)}
+
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, ds, mockHistogram)
+ require.NoError(t, err)
+
+ nm.Start(time.Duration(0), time.Hour)
+
+ return func(t *testing.T) { scenario(t, nm, ds) }
+ }
+
+ t.Run("unknown virtual storage", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ _, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "")
+ require.True(t, errors.Is(err, ErrVirtualStorageNotExist))
+ }))
+
+ t.Run("no replication events", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "no/matter")
+ require.NoError(t, err)
+ require.Contains(t, []string{vs0Primary, "unix://" + sockets[1]}, node.GetAddress())
+ }))
+
+ t.Run("last replication event is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ _, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-0",
+ },
+ })
+ require.NoError(t, err)
+
+ node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "path/1")
+ require.NoError(t, err)
+ require.Equal(t, vs0Primary, node.GetAddress())
+ }))
+
+ t.Run("last replication event is in 'in_progress'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ vs0Event, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-0",
+ },
+ })
+ require.NoError(t, err)
+
+ vs0Events, err := ds.Dequeue(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.TargetNodeStorage, 100500)
+ require.NoError(t, err)
+ require.Len(t, vs0Events, 1)
+
+ node, err := nm.GetSyncedNode(ctx, "virtual-storage-0", "path/1")
+ require.NoError(t, err)
+ require.Equal(t, vs0Primary, node.GetAddress())
+ }))
+
+ t.Run("last replication event is in 'failed'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ vs0Event := ackEvent(ds, datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-0",
+ }, datastore.JobStateFailed)
+
+ node, err := nm.GetSyncedNode(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, vs0Primary, node.GetAddress())
+ }))
+
+ t.Run("multiple replication events for same virtual, last is in 'ready'", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ vsEvent0 := ackEvent(ds, datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-0",
+ }, datastore.JobStateCompleted)
+
+ vsEvent1, err := ds.Enqueue(ctx, datastore.ReplicationEvent{
+ Job: datastore.ReplicationJob{
+ RelativePath: vsEvent0.Job.RelativePath,
+ TargetNodeStorage: vsEvent0.Job.TargetNodeStorage,
+ SourceNodeStorage: vsEvent0.Job.SourceNodeStorage,
+ VirtualStorage: vsEvent0.Job.VirtualStorage,
+ },
+ })
+ require.NoError(t, err)
+
+ node, err := nm.GetSyncedNode(ctx, vsEvent1.Job.VirtualStorage, vsEvent1.Job.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, vs0Primary, node.GetAddress())
+ }))
+
+ t.Run("same repo path for different virtual storages", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ vs0Event := ackEvent(ds, datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-0",
+ }, datastore.JobStateDead)
+
+ ackEvent(ds, datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-1",
+ VirtualStorage: "virtual-storage-1",
+ }, datastore.JobStateCompleted)
+
+ node, err := nm.GetSyncedNode(ctx, vs0Event.Job.VirtualStorage, vs0Event.Job.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, vs0Primary, node.GetAddress())
+ }))
+
+ t.Run("secondary is up to date in multi-virtual setup with processed replication jobs", verify(func(t *testing.T, nm Manager, ds datastore.Datastore) {
+ ackEvent(ds, datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-1",
+ SourceNodeStorage: "gitaly-0",
+ VirtualStorage: "virtual-storage-0",
+ }, datastore.JobStateCompleted)
+
+ ackEvent(ds, datastore.ReplicationJob{
+ RelativePath: "path/1",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-1",
+ VirtualStorage: "virtual-storage-1",
+ }, datastore.JobStateCompleted)
+
+ vs1Event := ackEvent(ds, datastore.ReplicationJob{
+ RelativePath: "path/2",
+ TargetNodeStorage: "gitaly-2",
+ SourceNodeStorage: "gitaly-1",
+ VirtualStorage: "virtual-storage-1",
+ }, datastore.JobStateCompleted)
+
+ node, err := nm.GetSyncedNode(ctx, vs1Event.Job.VirtualStorage, vs1Event.Job.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, vs1Secondary, node.GetAddress())
+ }))
+}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index d496c4cd8..6aefbf14c 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -90,7 +90,7 @@ func TestProcessReplicationJob(t *testing.T) {
ds := datastore.Datastore{
ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
}
// create object pool on the source
@@ -149,7 +149,7 @@ func TestProcessReplicationJob(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
replicator.log = entry
- nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -217,11 +217,11 @@ func TestPropagateReplicationJob(t *testing.T) {
ds := datastore.Datastore{
ReplicasDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(conf),
}
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -457,7 +457,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one")
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
processed := make(chan struct{})
dequeues := 0
@@ -520,7 +520,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr(logEntry, ds, nodeMgr)
@@ -584,7 +584,7 @@ func TestProcessBacklog_Success(t *testing.T) {
})
require.Len(t, gitaly_config.Config.Storages, 2, "expected 'default' storage and a new one")
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
processed := make(chan struct{})
queueInterceptor.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
@@ -658,7 +658,7 @@ func TestProcessBacklog_Success(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, promtest.NewMockHistogramVec())
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, ds, promtest.NewMockHistogramVec())
require.NoError(t, err)
replMgr := NewReplMgr(logEntry, ds, nodeMgr)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index ab4610a1d..c9cd4fdce 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strings"
+ "sync"
"testing"
"time"
@@ -433,7 +434,7 @@ func TestRepoRemoval(t *testing.T) {
// TODO: once https://gitlab.com/gitlab-org/gitaly/-/issues/2703 is done and the replication manager supports
// graceful shutdown, we can remove this code that waits for jobs to be complete
- queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue())
+ queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
ds := datastore.Datastore{
ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: queueInterceptor,
@@ -524,6 +525,7 @@ func TestRepoRename(t *testing.T) {
},
},
},
+ DistributedReadsEnabled: true,
}
virtualStorage := conf.VirtualStorages[0]
@@ -557,7 +559,21 @@ func TestRepoRename(t *testing.T) {
_, path2, cleanup2 := cloneRepoAtStorage(t, repo0, virtualStorage.Nodes[2].Storage)
defer cleanup2()
- cc, _, cleanup := runPraefectServerWithGitaly(t, conf)
+ var canCheckRepo sync.WaitGroup
+ canCheckRepo.Add(2)
+
+ evq := datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue(conf))
+ evq.OnAcknowledge(func(ctx context.Context, state datastore.JobState, ids []uint64, queue datastore.ReplicationEventQueue) ([]uint64, error) {
+ defer canCheckRepo.Done()
+ return queue.Acknowledge(ctx, state, ids)
+ })
+
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: evq,
+ }
+
+ cc, _, cleanup := runPraefectServerWithGitalyWithDatastore(t, conf, ds)
defer cleanup()
ctx, cancel := testhelper.Context()
@@ -597,6 +613,11 @@ func TestRepoRename(t *testing.T) {
cpVirtualRepo := *virtualRepo
renamedVirtualRepo := &cpVirtualRepo
renamedVirtualRepo.RelativePath = newName
+
+ // wait until replication jobs propagate changes to other storages
+ // as we don't know which one will be used to check because of read distribution
+ canCheckRepo.Wait()
+
resp, err = repoServiceClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
Repository: renamedVirtualRepo,
})