Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2020-07-07 17:42:22 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-07-22 16:19:12 +0300
commit3beb442788cf5f8bc1aa74de745d4f5de76e175b (patch)
tree1d26334a4834f46d6d768bd74690decac598332d /internal/praefect/server_factory_test.go
parent521bb978da8780aca690136e78a3ad388726c8ad (diff)
repository state tracking
Tracking the expected and the actual repository states within a virtual storage is currently done by searching through the replication queue. This requires many variables to be taken in to account such as timings between different jobs and the job history of source nodes. To make the tracking easier, this commit adds two tables to record the latest state of repositories across the cluster: 1. `repositories` table contains the expected state of a repository within a virtual storage. 2. `storage_repositories` table contains the state of the repository on a given storage that is part of a virtual storage. Cross-referencing `storage_repositories` with `repositories` makes it straightforward to figure out repositories which are in the expected state. If a repository on a storage is not in the expected state, appropriate corrective actions can be scheduled by diffing the expected record with the record of the stale storage. Each repository has a generation number which increases monotonically for each write. The generation number can be used to deduce whether the repository has the latest changes or not. The generation number guarantees the repository is at least on the generation stored but it may also be on a later generation if an update was partially applied. To prevent the generation number from referring to outdated data, repository downgrades are rejected. Generation numbers get propagated via replication jobs which again guarantee the repository will be at least on the generation included in the job. After the upgrade, there won't be any repositories in the tables and there might be replication jobs which do not have a generation number. To account for this, the downgrade protection is only applied to repositories which have a stored generation number, ensuring existing replication jobs during cluster upgrade are still accepted. As an upgraded primary receives new writes, the repository entries will be added to the tables and replication jobs with correct generation numbers scheduled.
Diffstat (limited to 'internal/praefect/server_factory_test.go')
-rw-r--r--internal/praefect/server_factory_test.go13
1 files changed, 7 insertions, 6 deletions
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 29f1cf753..7bb15c4b1 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -82,7 +82,8 @@ func TestServerFactory(t *testing.T) {
require.NoError(t, err)
txMgr := transactions.NewManager()
registry := protoregistry.GitalyProtoPreregistered
- coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry)
+ rs := datastore.NewMemoryRepositoryStore(conf.StorageNames())
+ coordinator := NewCoordinator(queue, rs, nodeMgr, txMgr, conf, registry)
checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient {
t.Helper()
@@ -104,7 +105,7 @@ func TestServerFactory(t *testing.T) {
}
t.Run("insecure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, ":0")
@@ -133,7 +134,7 @@ func TestServerFactory(t *testing.T) {
})
t.Run("secure", func(t *testing.T) {
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
defer praefectServerFactory.Stop()
listener, err := net.Listen(starter.TCP, ":0")
@@ -172,7 +173,7 @@ func TestServerFactory(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
defer praefectServerFactory.Stop()
// start with tcp address
@@ -240,7 +241,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls key path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.KeyPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")
@@ -249,7 +250,7 @@ func TestServerFactory(t *testing.T) {
t.Run("tls cert path invalid", func(t *testing.T) {
badTLSKeyPath := conf
badTLSKeyPath.TLS.CertPath = "invalid"
- praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry)
+ praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry)
err := praefectServerFactory.Serve(nil, true)
require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory")