diff options
author | Pavlo Strokov <pstrokov@gitlab.com> | 2020-03-28 00:28:25 +0300 |
---|---|---|
committer | Pavlo Strokov <pstrokov@gitlab.com> | 2020-04-02 17:55:54 +0300 |
commit | cc314adf47d3eed96d824f35ae601e891e2e70e2 (patch) | |
tree | 29aaf04e869e4299a0ccff12fe985ab7ece33c5d | |
parent | 61fa556719b444d8dacb4622555df97f6c69e2bc (diff) |
Praefect: Postgres queue implementation in use
Implementation of the replication events queue now can be
switched using `postgres_queue_enabled` between in-memory
and Postgres.
`Datastore` changed from interface to struct as there is no
single struct implementation for it anymore.
Closes: https://gitlab.com/gitlab-org/gitaly/-/issues/2166
-rw-r--r-- | changelogs/unreleased/ps-postgres-queue-switch.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 39 | ||||
-rw-r--r-- | internal/praefect/auth_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 1 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 1 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 1 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 8 | ||||
-rw-r--r-- | internal/praefect/datastore/queue.go | 5 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 10 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 18 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 3 |
12 files changed, 60 insertions, 43 deletions
diff --git a/changelogs/unreleased/ps-postgres-queue-switch.yml b/changelogs/unreleased/ps-postgres-queue-switch.yml new file mode 100644 index 000000000..9a20d45f5 --- /dev/null +++ b/changelogs/unreleased/ps-postgres-queue-switch.yml @@ -0,0 +1,5 @@ +--- +title: 'Praefect: Postgres queue implementation in use' +merge_request: 1989 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 2d98b05cd..47ca053b7 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -47,6 +47,7 @@ package main import ( "context" + "database/sql" "errors" "flag" "fmt" @@ -181,12 +182,23 @@ func run(cfgs []starter.Config, conf config.Config) error { return err } + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), + } + + if conf.PostgresQueueEnabled { + db, closedb, err := initDatabase(logger, conf) + if err != nil { + return err + } + defer closedb() + ds.ReplicationEventQueue = datastore.NewPostgresReplicationEventQueue(db) + } else { + ds.ReplicationEventQueue = datastore.NewMemoryReplicationEventQueue() + } + var ( // top level server dependencies - ds = datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), - } coordinator = praefect.NewCoordinator(logger, ds, nodeManager, conf, registry) repl = praefect.NewReplMgr( conf.VirtualStorages[0].Name, @@ -200,8 +212,6 @@ func run(cfgs []starter.Config, conf config.Config) error { serverErrors = make(chan error, 1) ) - testSQLConnection(logger, conf) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -254,24 +264,23 @@ func getStarterConfigs(socketPath, listenAddr string) ([]starter.Config, error) return cfgs, nil } -// Test Postgres connection, for diagnostic purposes only while we roll -// out Postgres support. https://gitlab.com/gitlab-org/gitaly/issues/1755 -func testSQLConnection(logger *logrus.Entry, conf config.Config) { +func initDatabase(logger *logrus.Entry, conf config.Config) (*sql.DB, func(), error) { db, err := glsql.OpenDB(conf.DB) if err != nil { logger.WithError(err).Error("SQL connection open failed") - return + return nil, nil, err } - defer func() { + closedb := func() { if err := db.Close(); err != nil { logger.WithError(err).Error("SQL connection close failed") } - }() + } if err := datastore.CheckPostgresVersion(db); err != nil { - logger.WithError(err).Error("SQL connection check failed") - } else { - logger.Info("SQL connection check successful") + closedb() + return nil, nil, err } + + return db, closedb, nil } diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go index 3831b60f8..7fce795b2 100644 --- a/internal/praefect/auth_test.go +++ b/internal/praefect/auth_test.go @@ -189,8 +189,8 @@ func runServer(t *testing.T, token string, required bool) (*Server, string, func } logEntry := testhelper.DiscardTestEntry(t) - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), } diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index e6d0efa3e..da7f76015 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -30,6 +30,7 @@ type Config struct { Auth auth.Config `toml:"auth"` DB `toml:"database"` FailoverEnabled bool `toml:"failover_enabled"` + PostgresQueueEnabled bool `toml:"postgres_queue_enabled"` } // VirtualStorage represents a set of nodes for a storage diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 24f2702d3..07e4fb449 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -179,6 +179,7 @@ func TestConfigParsing(t *testing.T) { SSLKey: "/path/to/key", SSLRootCert: "/path/to/root-cert", }, + PostgresQueueEnabled: true, }, }, //TODO: Remove this test, as well as the fixture in testdata/single-virtual-storage.config.toml diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 0fc25ea9d..330470c90 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -1,6 +1,7 @@ listen_addr = "" socket_path = "" prometheus_listen_addr = "" +postgres_queue_enabled = true [logging] format = "json" diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 9251d8010..6abfa023f 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -58,8 +58,8 @@ func TestStreamDirector(t *testing.T) { return queue.Enqueue(ctx, event) }) - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: queueInterceptor, } @@ -183,8 +183,8 @@ func TestAbsentCorrelationID(t *testing.T) { return queue.Enqueue(ctx, event) }) - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: queueInterceptor, } diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 14b1f509a..bbeac069b 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -105,7 +105,7 @@ type ReplJob struct { // Datastore is a data persistence abstraction for all of Praefect's // persistence needs -type Datastore interface { +type Datastore struct { ReplicasDatastore ReplicationEventQueue } @@ -124,12 +124,6 @@ type ReplicasDatastore interface { GetStorageNodes() ([]models.Node, error) } -// MemoryQueue is an intermediate struct used for introduction of ReplicationEventQueue into usage. -type MemoryQueue struct { - *MemoryDatastore - ReplicationEventQueue -} - // MemoryDatastore is a simple datastore that isn't persisted to disk. It is // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation diff --git a/internal/praefect/datastore/queue.go b/internal/praefect/datastore/queue.go index eed08fdfb..f9fbd41c5 100644 --- a/internal/praefect/datastore/queue.go +++ b/internal/praefect/datastore/queue.go @@ -138,6 +138,11 @@ func ScanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error // interface implementation protection var _ ReplicationEventQueue = PostgresReplicationEventQueue{} +// NewPostgresReplicationEventQueue returns new instance with provided Querier as a reference to storage. +func NewPostgresReplicationEventQueue(qc glsql.Querier) PostgresReplicationEventQueue { + return PostgresReplicationEventQueue{qc: qc} +} + // PostgresReplicationEventQueue is a Postgres implementation of persistent queue. type PostgresReplicationEventQueue struct { qc glsql.Querier diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 48a2ba320..ce1b1958d 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -75,8 +75,8 @@ func testConfig(backends int) config.Config { // setupServer wires all praefect dependencies together via dependency // injection func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, l *logrus.Entry, r *protoregistry.Registry) *Server { - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), } coordinator := NewCoordinator(l, ds, nodeMgr, conf, r) @@ -131,7 +131,7 @@ func runPraefectServerWithMock(t *testing.T, conf config.Config, backends map[st errQ := make(chan error) - prf.RegisterServices(nodeMgr, conf, nil) + prf.RegisterServices(nodeMgr, conf, datastore.Datastore{}) go func() { errQ <- prf.Serve(listener, false) }() @@ -175,8 +175,8 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client conf.VirtualStorages[0].Nodes[i] = node } - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), } logEntry := log.Default() diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index b7f15f998..676a22401 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -87,8 +87,8 @@ func TestProcessReplicationJob(t *testing.T) { }, } - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), } @@ -208,9 +208,9 @@ func TestPropagateReplicationJob(t *testing.T) { }, } - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewReplicationEventQueueInterceptor(datastore.NewMemoryReplicationEventQueue()), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), } logEntry := testhelper.DiscardTestEntry(t) @@ -490,9 +490,9 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { return ackIDs, err }) - ds := datastore.MemoryQueue{ + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: queueInterceptor, - MemoryDatastore: datastore.NewInMemory(conf), } // this job exists to verify that replication works @@ -595,8 +595,8 @@ func TestProcessBacklog_Success(t *testing.T) { return ackIDs, err }) - ds := datastore.MemoryQueue{ - MemoryDatastore: datastore.NewInMemory(conf), + ds := datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), ReplicationEventQueue: queueInterceptor, } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index d3e9bf28e..ade82de87 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/helper/text" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" @@ -143,7 +144,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) { listener, port := listenAvailPort(t) go func() { - srv.RegisterServices(nodeMgr, conf, nil) + srv.RegisterServices(nodeMgr, conf, datastore.Datastore{}) srv.Serve(listener, false) }() |