diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-07 17:42:22 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-07-22 16:19:12 +0300 |
commit | 3beb442788cf5f8bc1aa74de745d4f5de76e175b (patch) | |
tree | 1d26334a4834f46d6d768bd74690decac598332d /internal/praefect/server_factory_test.go | |
parent | 521bb978da8780aca690136e78a3ad388726c8ad (diff) |
repository state tracking
Tracking the expected and the actual repository states within a virtual
storage is currently done by searching through the replication queue. This
requires many variables to be taken in to account such as timings between
different jobs and the job history of source nodes. To make the tracking
easier, this commit adds two tables to record the latest state of
repositories across the cluster:
1. `repositories` table contains the expected state of a repository
within a virtual storage.
2. `storage_repositories` table contains the state of the repository
on a given storage that is part of a virtual storage.
Cross-referencing `storage_repositories` with `repositories` makes it
straightforward to figure out repositories which are in the expected
state. If a repository on a storage is not in the expected state,
appropriate corrective actions can be scheduled by diffing the expected
record with the record of the stale storage.
Each repository has a generation number which increases monotonically
for each write. The generation number can be used to deduce whether
the repository has the latest changes or not. The generation number
guarantees the repository is at least on the generation stored but it
may also be on a later generation if an update was partially applied.
To prevent the generation number from referring to outdated data,
repository downgrades are rejected. Generation numbers get propagated
via replication jobs which again guarantee the repository will be at
least on the generation included in the job.
After the upgrade, there won't be any repositories in the tables and
there might be replication jobs which do not have a generation number.
To account for this, the downgrade protection is only applied to
repositories which have a stored generation number, ensuring existing
replication jobs during cluster upgrade are still accepted. As an
upgraded primary receives new writes, the repository entries will be
added to the tables and replication jobs with correct generation numbers
scheduled.
Diffstat (limited to 'internal/praefect/server_factory_test.go')
-rw-r--r-- | internal/praefect/server_factory_test.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go index 29f1cf753..7bb15c4b1 100644 --- a/internal/praefect/server_factory_test.go +++ b/internal/praefect/server_factory_test.go @@ -82,7 +82,8 @@ func TestServerFactory(t *testing.T) { require.NoError(t, err) txMgr := transactions.NewManager() registry := protoregistry.GitalyProtoPreregistered - coordinator := NewCoordinator(queue, nodeMgr, txMgr, conf, registry) + rs := datastore.NewMemoryRepositoryStore(conf.StorageNames()) + coordinator := NewCoordinator(queue, rs, nodeMgr, txMgr, conf, registry) checkOwnRegisteredServices := func(ctx context.Context, t *testing.T, cc *grpc.ClientConn) healthpb.HealthClient { t.Helper() @@ -104,7 +105,7 @@ func TestServerFactory(t *testing.T) { } t.Run("insecure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, ":0") @@ -133,7 +134,7 @@ func TestServerFactory(t *testing.T) { }) t.Run("secure", func(t *testing.T) { - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) defer praefectServerFactory.Stop() listener, err := net.Listen(starter.TCP, ":0") @@ -172,7 +173,7 @@ func TestServerFactory(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) defer praefectServerFactory.Stop() // start with tcp address @@ -240,7 +241,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls key path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.KeyPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") @@ -249,7 +250,7 @@ func TestServerFactory(t *testing.T) { t.Run("tls cert path invalid", func(t *testing.T) { badTLSKeyPath := conf badTLSKeyPath.TLS.CertPath = "invalid" - praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, registry) + praefectServerFactory := NewServerFactory(badTLSKeyPath, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, registry) err := praefectServerFactory.Serve(nil, true) require.EqualError(t, err, "load certificate key pair: open invalid: no such file or directory") |