Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2020-03-28 00:28:25 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2020-04-02 17:55:54 +0300
commitcc314adf47d3eed96d824f35ae601e891e2e70e2 (patch)
tree29aaf04e869e4299a0ccff12fe985ab7ece33c5d
parent61fa556719b444d8dacb4622555df97f6c69e2bc (diff)
Praefect: Postgres queue implementation in use
Implementation of the replication events queue now can be switched using `postgres_queue_enabled` between in-memory and Postgres. `Datastore` changed from interface to struct as there is no single struct implementation for it anymore. Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2166
-rw-r--r--changelogs/unreleased/ps-postgres-queue-switch.yml5
-rw-r--r--cmd/praefect/main.go39
-rw-r--r--internal/praefect/auth_test.go4
-rw-r--r--internal/praefect/config/config.go1
-rw-r--r--internal/praefect/config/config_test.go1
-rw-r--r--internal/praefect/config/testdata/config.toml1
-rw-r--r--internal/praefect/coordinator_test.go8
-rw-r--r--internal/praefect/datastore/datastore.go8
-rw-r--r--internal/praefect/datastore/queue.go5
-rw-r--r--internal/praefect/helper_test.go10
-rw-r--r--internal/praefect/replicator_test.go18
-rw-r--r--internal/praefect/server_test.go3
12 files changed, 60 insertions, 43 deletions
diff --git a/changelogs/unreleased/ps-postgres-queue-switch.yml b/changelogs/unreleased/ps-postgres-queue-switch.yml
new file mode 100644
index 000000000..9a20d45f5
--- /dev/null
+++ b/changelogs/unreleased/ps-postgres-queue-switch.yml
@@ -0,0 +1,5 @@
+---
+title: 'Praefect: Postgres queue implementation in use'
+merge_request: 1989
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 2d98b05cd..47ca053b7 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -47,6 +47,7 @@ package main
import (
"context"
+ "database/sql"
"errors"
"flag"
"fmt"
@@ -181,12 +182,23 @@ func run(cfgs []starter.Config, conf config.Config) error {
return err
}
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
+ }
+
+ if conf.PostgresQueueEnabled {
+ db, closedb, err := initDatabase(logger, conf)
+ if err != nil {
+ return err
+ }
+ defer closedb()
+ ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db)
+ } else {
+ ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue()
+ }
+
var (
// top level server dependencies
- ds = datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
- }
coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, registry)
repl = praefect.NewReplMgr(
conf.VirtualStorages[0].Name,
@@ -200,8 +212,6 @@ func run(cfgs []starter.Config, conf config.Config) error {
serverErrors = make(chan error, 1)
)
- testSQLConnection(logger, conf)
-
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -254,24 +264,23 @@ func getStarterConfigs(socketPath, listenAddr string) ([]starter.Config, error)
return cfgs, nil
}
-// Test Postgres connection, for diagnostic purposes only while we roll
-// out Postgres support. https://gitlab.com/gitlab-org/gitaly/issues/1755
-func testSQLConnection(logger *logrus.Entry, conf config.Config) {
+func initDatabase(logger *logrus.Entry, conf config.Config) (*sql.DB, func(), error) {
db, err := glsql.OpenDB(conf.DB)
if err != nil {
logger.WithError(err).Error("SQL connection open failed")
- return
+ return nil, nil, err
}
- defer func() {
+ closedb := func() {
if err := db.Close(); err != nil {
logger.WithError(err).Error("SQL connection close failed")
}
- }()
+ }
if err := datastore.CheckPostgresVersion(db); err != nil {
- logger.WithError(err).Error("SQL connection check failed")
- } else {
- logger.Info("SQL connection check successful")
+ closedb()
+ return nil, nil, err
}
+
+ return db, closedb, nil
}
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 3831b60f8..7fce795b2 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -189,8 +189,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func
}
logEntry := testhelper.DiscardTestEntry(t)
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
}
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index e6d0efa3e..da7f76015 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -30,6 +30,7 @@ type Config struct {
Auth auth.Config `toml:"auth"`
DB `toml:"database"`
FailoverEnabled bool `toml:"failover_enabled"`
+ PostgresQueueEnabled bool `toml:"postgres_queue_enabled"`
}
// VirtualStorage represents a set of nodes for a storage
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 24f2702d3..07e4fb449 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -179,6 +179,7 @@ func TestConfigParsing(t *testing.T) {
SSLKey: "/path/to/key",
SSLRootCert: "/path/to/root-cert",
},
+ PostgresQueueEnabled: true,
},
},
//TODO: Remove this test, as well as the fixture in testdata/single-virtual-storage.config.toml
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 0fc25ea9d..330470c90 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -1,6 +1,7 @@
listen_addr = ""
socket_path = ""
prometheus_listen_addr = ""
+postgres_queue_enabled = true
[logging]
format = "json"
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 9251d8010..6abfa023f 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -58,8 +58,8 @@ func TestStreamDirector(t *testing.T) {
return queue.Enqueue(ctx, event)
})
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: queueInterceptor,
}
@@ -183,8 +183,8 @@ func TestAbsentCorrelationID(t *testing.T) {
return queue.Enqueue(ctx, event)
})
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: queueInterceptor,
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 14b1f509a..bbeac069b 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -105,7 +105,7 @@ type ReplJob struct {
// Datastore is a data persistence abstraction for all of Praefect's
// persistence needs
-type Datastore interface {
+type Datastore struct {
ReplicasDatastore
ReplicationEventQueue
}
@@ -124,12 +124,6 @@ type ReplicasDatastore interface {
GetStorageNodes() ([]models.Node, error)
}
-// MemoryQueue is an intermediate struct used for introduction of ReplicationEventQueue into usage.
-type MemoryQueue struct {
- *MemoryDatastore
- ReplicationEventQueue
-}
-
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go
index eed08fdfb..f9fbd41c5 100644
--- a/internal/praefect/datastore/queue.go
+++ b/internal/praefect/datastore/queue.go
@@ -138,6 +138,11 @@ func ScanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error
// interface implementation protection
var _ ReplicationEventQueue = PostgresReplicationEventQueue{}
+// NewPostgresReplicationEventQueue returns new instance with provided Querier as a reference to storage.
+func NewPostgresReplicationEventQueue(qc glsql.Querier) PostgresReplicationEventQueue {
+ return PostgresReplicationEventQueue{qc: qc}
+}
+
// PostgresReplicationEventQueue is a Postgres implementation of persistent queue.
type PostgresReplicationEventQueue struct {
qc glsql.Querier
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 48a2ba320..ce1b1958d 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -75,8 +75,8 @@ func testConfig(backends int) config.Config {
// setupServer wires all praefect dependencies together via dependency
// injection
func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) *Server {
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
}
coordinator := NewCoordinator(l, ds, nodeMgr, conf, r)
@@ -131,7 +131,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st
errQ := make(chan error)
- prf.RegisterServices(nodeMgr, conf, nil)
+ prf.RegisterServices(nodeMgr, conf, datastore.Datastore{})
go func() {
errQ <- prf.Serve(listener, false)
}()
@@ -175,8 +175,8 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client
conf.VirtualStorages[0].Nodes[i] = node
}
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
}
logEntry := log.Default()
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index b7f15f998..676a22401 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -87,8 +87,8 @@ func TestProcessReplicationJob(t *testing.T) {
},
}
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
}
@@ -208,9 +208,9 @@ func TestPropagateReplicationJob(t *testing.T) {
},
}
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
- ReplicationEventQueue: datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
+ ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(),
}
logEntry := testhelper.DiscardTestEntry(t)
@@ -490,9 +490,9 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
return ackIDs, err
})
- ds := datastore.MemoryQueue{
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: queueInterceptor,
- MemoryDatastore: datastore.NewInMemory(conf),
}
// this job exists to verify that replication works
@@ -595,8 +595,8 @@ func TestProcessBacklog_Success(t *testing.T) {
return ackIDs, err
})
- ds := datastore.MemoryQueue{
- MemoryDatastore: datastore.NewInMemory(conf),
+ ds := datastore.Datastore{
+ ReplicasDatastore: datastore.NewInMemory(conf),
ReplicationEventQueue: queueInterceptor,
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index d3e9bf28e..ade82de87 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -19,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/helper/text"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/nodes"
@@ -143,7 +144,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
listener, port := listenAvailPort(t)
go func() {
- srv.RegisterServices(nodeMgr, conf, nil)
+ srv.RegisterServices(nodeMgr, conf, datastore.Datastore{})
srv.Serve(listener, false)
}()