diff options
author | Toon Claes <toon@gitlab.com> | 2023-05-09 20:34:51 +0300 |
---|---|---|
committer | Toon Claes <toon@gitlab.com> | 2023-05-09 20:34:51 +0300 |
commit | e12daecd74f27092529134ebf40c32a468b6d00a (patch) | |
tree | 77647e03d01f38d0253377dc3c68ab8689e935a7 | |
parent | 319f059397094c4e2c0542309218e64cfd2a1a98 (diff) | |
parent | 7aac7784034000876cf9031211b51ed8872dd5ad (diff) |
Merge branch 'pks-datastore-refactor-consistent-storages-cache' into 'master'
datastore: Refactor consistent storages cache
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5729
Merged-by: Toon Claes <toon@gitlab.com>
Approved-by: Toon Claes <toon@gitlab.com>
Reviewed-by: Toon Claes <toon@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r-- | internal/datastructure/set.go | 98 | ||||
-rw-r--r-- | internal/datastructure/set_test.go | 157 | ||||
-rw-r--r-- | internal/datastructure/testhelper_test.go | 11 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 27 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store.go | 17 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_mock.go | 18 | ||||
-rw-r--r-- | internal/praefect/datastore/repository_store_test.go | 24 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider.go | 120 | ||||
-rw-r--r-- | internal/praefect/datastore/storage_provider_test.go | 76 | ||||
-rw-r--r-- | internal/praefect/nodes/manager.go | 9 | ||||
-rw-r--r-- | internal/praefect/nodes/manager_test.go | 15 | ||||
-rw-r--r-- | internal/praefect/router_node_manager.go | 11 | ||||
-rw-r--r-- | internal/praefect/router_per_repository.go | 6 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 9 |
14 files changed, 451 insertions, 147 deletions
diff --git a/internal/datastructure/set.go b/internal/datastructure/set.go new file mode 100644 index 000000000..966d8b8ef --- /dev/null +++ b/internal/datastructure/set.go @@ -0,0 +1,98 @@ +package datastructure + +// Set is a data structure that contains unique values. This data structure is not safe for +// concurrent accesses. +type Set[Value comparable] struct { + values map[Value]struct{} +} + +// NewSet returns a new ready-to-use set. +func NewSet[Value comparable]() *Set[Value] { + return SetFromSlice[Value](nil) +} + +// SetFromValues creates a new set that is populated with the provided values. The values will be +// deduplicated. This can be more efficient than manually creating and populating the set with +// values as the underlying data structure will already be preallocated. +func SetFromValues[Value comparable](values ...Value) *Set[Value] { + return SetFromSlice(values) +} + +// SetFromSlice creates a new set that is prepopulated with the values from the given slice. The +// slice values will be deduplicated. This can be more efficient than manually creating and +// populating the set with values as the underlying data structure will already be preallocated. +func SetFromSlice[Value comparable](slice []Value) *Set[Value] { + values := make(map[Value]struct{}, len(slice)) + for _, value := range slice { + values[value] = struct{}{} + } + + return &Set[Value]{ + values: values, + } +} + +// Add adds the given value to the set. Returns `true` if the value was added, and `false` in case +// no change was made to the set. +func (s *Set[Value]) Add(v Value) bool { + if _, exists := s.values[v]; !exists { + s.values[v] = struct{}{} + return true + } + + return false +} + +// Remove removes the given value from the set. Returns `true` if the value was removed, and `false` +// in case no change was made to the set. +func (s *Set[Value]) Remove(v Value) bool { + if _, exists := s.values[v]; exists { + delete(s.values, v) + return true + } + + return false +} + +// HasValue determines whether the given value is contained in the set. +func (s *Set[Value]) HasValue(v Value) bool { + _, exists := s.values[v] + return exists +} + +// Len returns the number of entries contained in the set. +func (s *Set[Value]) Len() int { + return len(s.values) +} + +// IsEmpty determines whether the set is empty. +func (s *Set[Value]) IsEmpty() bool { + return s.Len() == 0 +} + +// Equal determines whether the set is equal to another set. Two sets are equal when they contain +// the exact same values. +func (s *Set[Value]) Equal(o *Set[Value]) bool { + if s.Len() != o.Len() { + return false + } + + for _, value := range s.Values() { + if !o.HasValue(value) { + return false + } + } + + return true +} + +// Values returns an array of all values that have been added to the set. The array is newly +// allocated and can be modified without affecting the set itself. The order of values is not +// guaranteed. +func (s *Set[Value]) Values() []Value { + values := make([]Value, 0, len(s.values)) + for value := range s.values { + values = append(values, value) + } + return values +} diff --git a/internal/datastructure/set_test.go b/internal/datastructure/set_test.go new file mode 100644 index 000000000..dc60613a3 --- /dev/null +++ b/internal/datastructure/set_test.go @@ -0,0 +1,157 @@ +package datastructure + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSet(t *testing.T) { + t.Parallel() + + t.Run("new set is empty", func(t *testing.T) { + s := NewSet[bool]() + requireSet(t, s, []bool{}) + }) + + t.Run("set from slice contains values", func(t *testing.T) { + s := SetFromSlice([]int{1, 3, 5}) + requireSet(t, s, []int{1, 3, 5}) + }) + + t.Run("set from slice deduplicates values", func(t *testing.T) { + s := SetFromSlice([]int{1, 1, 3, 5, 5}) + requireSet(t, s, []int{1, 3, 5}) + }) + + t.Run("set from values contains values", func(t *testing.T) { + s := SetFromValues(1, 3, 5) + requireSet(t, s, []int{1, 3, 5}) + }) + + t.Run("set from values deduplicates values", func(t *testing.T) { + s := SetFromValues(1, 1, 3, 5, 5) + requireSet(t, s, []int{1, 3, 5}) + }) + + t.Run("add value", func(t *testing.T) { + s := NewSet[bool]() + require.True(t, s.Add(true)) + + requireSet(t, s, []bool{true}) + require.False(t, s.HasValue(false)) + }) + + t.Run("remove value", func(t *testing.T) { + s := NewSet[bool]() + require.True(t, s.Add(true)) + require.True(t, s.Remove(true)) + + requireSet(t, s, []bool{}) + }) + + t.Run("adding multiple values", func(t *testing.T) { + s := NewSet[int]() + for i := 0; i < 5; i++ { + require.True(t, s.Add(i)) + } + + requireSet(t, s, []int{0, 1, 2, 3, 4}) + }) + + t.Run("adding and removing multiple values", func(t *testing.T) { + s := NewSet[int]() + for i := 0; i < 10; i++ { + require.True(t, s.Add(i)) + } + for i := 0; i < 5; i++ { + require.True(t, s.Remove(i*2)) + } + + requireSet(t, s, []int{1, 3, 5, 7, 9}) + }) + + t.Run("re-adding values", func(t *testing.T) { + s := NewSet[int]() + + require.True(t, s.Add(1)) + require.False(t, s.Add(1)) + + requireSet(t, s, []int{1}) + }) + + t.Run("removing nonexistent value", func(t *testing.T) { + s := NewSet[int]() + + require.True(t, s.Add(1)) + require.False(t, s.Remove(0)) + + requireSet(t, s, []int{1}) + }) +} + +func TestSet_empty(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + a, b *Set[int] + expectEqual bool + }{ + { + desc: "empty sets", + a: NewSet[int](), + b: NewSet[int](), + expectEqual: true, + }, + { + desc: "empty and non-empty set", + a: NewSet[int](), + b: SetFromValues(1), + expectEqual: false, + }, + { + desc: "disjunct sets", + a: SetFromValues(2), + b: SetFromValues(1), + expectEqual: false, + }, + { + desc: "partially disjunct sets", + a: SetFromValues(1, 2), + b: SetFromValues(1, 3), + expectEqual: false, + }, + { + desc: "subset", + a: SetFromValues(1), + b: SetFromValues(1, 3), + expectEqual: false, + }, + { + desc: "superset", + a: SetFromValues(1, 3), + b: SetFromValues(1), + expectEqual: false, + }, + { + desc: "same values", + a: SetFromValues(1, 2, 3), + b: SetFromValues(1, 2, 3), + expectEqual: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.expectEqual, tc.a.Equal(tc.b)) + }) + } +} + +func requireSet[Value comparable](t *testing.T, s *Set[Value], expectedValues []Value) { + t.Helper() + + require.Equal(t, len(expectedValues), s.Len()) + require.ElementsMatch(t, expectedValues, s.Values()) + require.Equal(t, len(expectedValues) == 0, s.IsEmpty()) + require.True(t, s.Equal(SetFromSlice(expectedValues))) +} diff --git a/internal/datastructure/testhelper_test.go b/internal/datastructure/testhelper_test.go new file mode 100644 index 000000000..4aa3a317f --- /dev/null +++ b/internal/datastructure/testhelper_test.go @@ -0,0 +1,11 @@ +package datastructure + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 71a96ba53..4d6ef6867 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/client" "gitlab.com/gitlab-org/gitaly/v15/internal/cache" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" gconfig "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service" @@ -94,11 +95,11 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { ctx := testhelper.Context(t) rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, error) { + GetConsistentStoragesFunc: func(context.Context, string, string) (string, *datastructure.Set[string], error) { if tc.readOnly { - return "", map[string]struct{}{storage + "-other": {}}, nil + return "", datastructure.SetFromValues(storage + "-other"), nil } - return "", map[string]struct{}{storage: {}}, nil + return "", datastructure.NewSet[string](), nil }, } @@ -483,8 +484,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { } rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + return relativePath, datastructure.SetFromValues("primary", "secondary"), nil }, } @@ -595,8 +596,8 @@ func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) { } rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"praefect-internal-1": {}, "praefect-internal-2": {}, "praefect-internal-3": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + return relativePath, datastructure.SetFromValues("praefect-internal-1", "praefect-internal-2", "praefect-internal-3"), nil }, } @@ -681,8 +682,8 @@ func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) { incrementGenerationInvoked := false rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"praefect-internal-2": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + return relativePath, datastructure.SetFromValues("praefect-internal-2"), nil }, CreateRepositoryFunc: func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { require.Fail(t, "CreateRepository should not be called") @@ -1172,8 +1173,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { entry := testhelper.NewDiscardingLogEntry(t) repoStore := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + return relativePath, datastructure.SetFromValues(primaryNodeConf.Storage, secondaryNodeConf.Storage), nil }, } @@ -2186,8 +2187,8 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) { GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) { return repoProto.GetRelativePath(), nil }, - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + return relativePath, datastructure.SetFromValues("primary", "secondary-1", "secondary-2"), nil }, }, }) diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 5a317c689..8abcf945e 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" ) @@ -125,7 +126,7 @@ type RepositoryStore interface { // RenameRepository which can be removed in a later release. RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. - GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) + GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) ConsistentStoragesGetter // RepositoryExists returns whether the repository exists on a virtual storage. RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -641,7 +642,7 @@ AND storage = $3 } // GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID. -func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { +func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) { return rs.getConsistentStorages(ctx, ` SELECT replica_path, ARRAY_AGG(storage) FROM repositories @@ -652,7 +653,7 @@ GROUP BY replica_path } // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. -func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { replicaPath, storages, err := rs.getConsistentStorages(ctx, ` SELECT replica_path, ARRAY_AGG(storage) FROM repositories @@ -669,7 +670,7 @@ GROUP BY replica_path } // getConsistentStorages is a helper for querying the consistent storages by different keys. -func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, error) { +func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, *datastructure.Set[string], error) { var replicaPath string var storages glsql.StringArray @@ -681,13 +682,7 @@ func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, qu return "", nil, fmt.Errorf("query: %w", err) } - result := storages.Slice() - consistentStorages := make(map[string]struct{}, len(result)) - for _, storage := range result { - consistentStorages[storage] = struct{}{} - } - - return replicaPath, consistentStorages, nil + return replicaPath, datastructure.SetFromSlice(storages.Slice()), nil } //nolint:revive // This is unintentionally missing documentation. diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go index 5705cd9a2..c566d432c 100644 --- a/internal/praefect/datastore/repository_store_mock.go +++ b/internal/praefect/datastore/repository_store_mock.go @@ -1,6 +1,10 @@ package datastore -import "context" +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" +) // MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods // default to what could be considered success if not set. @@ -16,8 +20,8 @@ type MockRepositoryStore struct { DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error RenameRepositoryInPlaceFunc func(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error - GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) - GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) + GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) + GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error) DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error) @@ -115,18 +119,18 @@ func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorag } // GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) { if m.GetConsistentStoragesFunc == nil { - return "", map[string]struct{}{}, nil + return "", datastructure.NewSet[string](), nil } return m.GetConsistentStoragesByRepositoryIDFunc(ctx, repositoryID) } // GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map. -func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { +func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { if m.GetConsistentStoragesFunc == nil { - return "", map[string]struct{}{}, nil + return "", datastructure.NewSet[string](), nil } return m.GetConsistentStoragesFunc(ctx, virtualStorage, relativePath) diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go index d1d3f0204..c73f0c2ea 100644 --- a/internal/praefect/datastore/repository_store_test.go +++ b/internal/praefect/datastore/repository_store_test.go @@ -1042,12 +1042,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { t.Run("consistent secondary", func(t *testing.T) { replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) - require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) + require.ElementsMatch(t, []string{"primary", "consistent-secondary"}, secondaries.Values()) require.Equal(t, "replica-path", replicaPath) replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) - require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries) + require.ElementsMatch(t, []string{"primary", "consistent-secondary"}, secondaries.Values()) require.Equal(t, "replica-path", replicaPath) }) @@ -1056,12 +1056,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { t.Run("outdated primary", func(t *testing.T) { replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) - require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) + require.Equal(t, []string{"consistent-secondary"}, secondaries.Values()) require.Equal(t, "replica-path", replicaPath) replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) - require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries) + require.Equal(t, []string{"consistent-secondary"}, secondaries.Values()) require.Equal(t, "replica-path", replicaPath) }) @@ -1088,12 +1088,12 @@ func TestRepositoryStore_Postgres(t *testing.T) { replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo) require.NoError(t, err) - require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) + require.Equal(t, []string{"unknown"}, secondaries.Values()) require.Equal(t, "replica-path", replicaPath) replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) - require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries) + require.Equal(t, []string{"unknown"}, secondaries.Values()) require.Equal(t, "replica-path", replicaPath) }) @@ -1120,31 +1120,31 @@ func TestRepositoryStore_Postgres(t *testing.T) { replicaPath, storages, err := rs.GetConsistentStorages(ctx, vs, "original-path") require.NoError(t, err) require.Equal(t, "replica-path", replicaPath) - require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) + require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values()) replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "replica-path", replicaPath) - require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) + require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values()) require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-1", "new-path")) replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path") require.NoError(t, err) require.Equal(t, "new-path", replicaPath) - require.Equal(t, map[string]struct{}{"storage-1": {}}, storages) + require.Equal(t, []string{"storage-1"}, storages.Values()) replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "new-path", replicaPath) - require.Equal(t, map[string]struct{}{"storage-1": {}}, storages) + require.Equal(t, []string{"storage-1"}, storages.Values()) require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-2", "new-path")) replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path") require.NoError(t, err) require.Equal(t, "new-path", replicaPath) - require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) + require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values()) replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1) require.NoError(t, err) require.Equal(t, "new-path", replicaPath) - require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages) + require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values()) }) }) diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go index a7609c7a7..949fb246d 100644 --- a/internal/praefect/datastore/storage_provider.go +++ b/internal/praefect/datastore/storage_provider.go @@ -11,28 +11,38 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" ) // ConsistentStoragesGetter returns storages which contain the latest generation of a repository. type ConsistentStoragesGetter interface { // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. - GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) + GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) } // errNotExistingVirtualStorage indicates that the requested virtual storage can't be found or not configured. var errNotExistingVirtualStorage = errors.New("virtual storage does not exist") +type cachedReplicaInfo struct { + replicaPath string + storages *datastructure.Set[string] +} + +type virtualStorageCache struct { + syncer syncer + replicaInfoCache *lru.Cache[string, cachedReplicaInfo] +} + // CachingConsistentStoragesGetter is a ConsistentStoragesGetter that caches up to date storages by repository. // Each virtual storage has it's own cache that invalidates entries based on notifications. type CachingConsistentStoragesGetter struct { csg ConsistentStoragesGetter - // caches is per virtual storage cache. It is initialized once on construction. - caches map[string]*lru.Cache[string, interface{}] + // caches is per virtual storage cache. The caches themselves contain information about + // replicas keyed by their respective relative paths. + caches map[string]*virtualStorageCache // access is access method to use: 0 - without caching; 1 - with caching. access int32 - // syncer allows to sync retrieval operations to omit unnecessary runs. - syncer syncer // callbackLogger should be used only inside of the methods used as callbacks. callbackLogger logrus.FieldLogger cacheAccessTotal *prometheus.CounterVec @@ -42,8 +52,7 @@ type CachingConsistentStoragesGetter struct { func NewCachingConsistentStoragesGetter(logger logrus.FieldLogger, csg ConsistentStoragesGetter, virtualStorages []string) (*CachingConsistentStoragesGetter, error) { cached := &CachingConsistentStoragesGetter{ csg: csg, - caches: make(map[string]*lru.Cache[string, interface{}], len(virtualStorages)), - syncer: syncer{inflight: map[string]chan struct{}{}}, + caches: make(map[string]*virtualStorageCache, len(virtualStorages)), callbackLogger: logger.WithField("component", "caching_storage_provider"), cacheAccessTotal: prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -56,13 +65,16 @@ func NewCachingConsistentStoragesGetter(logger logrus.FieldLogger, csg Consisten for _, virtualStorage := range virtualStorages { virtualStorage := virtualStorage - cache, err := lru.NewWithEvict(2<<20, func(key string, value interface{}) { + replicaInfoCache, err := lru.NewWithEvict(2<<20, func(key string, value cachedReplicaInfo) { cached.cacheAccessTotal.WithLabelValues(virtualStorage, "evict").Inc() }) if err != nil { return nil, err } - cached.caches[virtualStorage] = cache + cached.caches[virtualStorage] = &virtualStorageCache{ + syncer: syncer{inflight: map[string]chan struct{}{}}, + replicaInfoCache: replicaInfoCache, + } } return cached, nil @@ -83,14 +95,14 @@ func (c *CachingConsistentStoragesGetter) Notification(n glsql.Notification) { } for _, entry := range changes { - cache, found := c.getCache(entry.VirtualStorage) + cache, found := c.caches[entry.VirtualStorage] if !found { c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", entry.VirtualStorage).Error("cache not found") continue } for _, relativePath := range entry.RelativePaths { - cache.Remove(relativePath) + cache.replicaInfoCache.Remove(relativePath) } } } @@ -124,40 +136,8 @@ func (c *CachingConsistentStoragesGetter) disableCaching() { atomic.StoreInt32(&c.access, 0) for _, cache := range c.caches { - cache.Purge() - } -} - -func (c *CachingConsistentStoragesGetter) getCache(virtualStorage string) (*lru.Cache[string, interface{}], bool) { - val, found := c.caches[virtualStorage] - return val, found -} - -func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc() - return c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) -} - -func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache[string, interface{}], cacheValue, bool) { - populateDone := func() {} // should be called AFTER any cache population is done - - cache, found := c.getCache(virtualStorage) - if !found { - return populateDone, nil, cacheValue{}, false - } - - if storages, found := getKey(cache, relativePath); found { - return populateDone, cache, storages, true - } - - // synchronises concurrent attempts to update cache for the same key. - populateDone = c.syncer.await(relativePath) - - if storages, found := getKey(cache, relativePath); found { - return populateDone, cache, storages, true + cache.replicaInfoCache.Purge() } - - return populateDone, cache, cacheValue{}, false } func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool { @@ -165,39 +145,43 @@ func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool { } // GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path. -func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - var cache *lru.Cache[string, interface{}] +func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + cache, hasCache := c.caches[virtualStorage] + if hasCache && c.isCacheEnabled() { + if replicaInfo, found := cache.replicaInfoCache.Get(relativePath); found { + c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc() + return replicaInfo.replicaPath, replicaInfo.storages, nil + } - if c.isCacheEnabled() { - var value cacheValue - var ok bool - var populationDone func() + // Synchronise concurrent attempts to update the cache for the same relative path. + // This will cause us to wait for any ongoing calls, but also locks out other new + // callers so that we can racelessly populate the cache. The deferred call will then + // unlock other callers again once we're done with the lookup. + defer cache.syncer.await(relativePath)() - populationDone, cache, value, ok = c.tryCache(virtualStorage, relativePath) - defer populationDone() - if ok { + // We re-try whether the cache has been populated now via any concurrent Goroutine. + // If so, we return the newly populated entry. + if replicaInfo, found := cache.replicaInfoCache.Get(relativePath); found { c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc() - return value.replicaPath, value.storages, nil + return replicaInfo.replicaPath, replicaInfo.storages, nil } + } else { + // Unset the cache so that we don't try to populate it when it is disabled. + cache = nil } - replicaPath, storages, err := c.cacheMiss(ctx, virtualStorage, relativePath) - if err == nil && cache != nil { - cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages}) + c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc() + + replicaPath, storages, err := c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath) + if err != nil { + return "", nil, err + } + if cache != nil { c.cacheAccessTotal.WithLabelValues(virtualStorage, "populate").Inc() + cache.replicaInfoCache.Add(relativePath, cachedReplicaInfo{replicaPath: replicaPath, storages: storages}) } - return replicaPath, storages, err -} -type cacheValue struct { - replicaPath string - storages map[string]struct{} -} - -func getKey(cache *lru.Cache[string, interface{}], key string) (cacheValue, bool) { - val, found := cache.Get(key) - vals, _ := val.(cacheValue) - return vals, found + return replicaPath, storages, err } // syncer allows to sync access to a particular key. diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go index 2b86cb213..98e54d2a6 100644 --- a/internal/praefect/datastore/storage_provider_test.go +++ b/internal/praefect/datastore/storage_provider_test.go @@ -1,7 +1,9 @@ package datastore import ( + "context" "encoding/json" + "errors" "runtime" "strings" "sync" @@ -13,6 +15,7 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql" "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" @@ -37,7 +40,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // empty cache should be populated replicaPath, storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages.Values()) require.Equal(t, "replica-path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` @@ -61,7 +64,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // empty cache should be populated replicaPath, storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages.Values()) require.Equal(t, "replica-path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` @@ -75,7 +78,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // populated cache should return cached value replicaPath, storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages.Values()) require.Equal(t, "replica-path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` @@ -126,7 +129,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // first access populates the cache replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1.Values()) require.Equal(t, "replica-path", replicaPath) // invalid payload disables caching @@ -137,19 +140,19 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // second access omits cached data as caching should be disabled replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2.Values()) require.Equal(t, "replica-path", replicaPath) // third access retrieves data and caches it replicaPath, storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages3) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages3.Values()) require.Equal(t, "replica-path", replicaPath) // fourth access retrieves data from cache replicaPath, storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages4) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages4.Values()) require.Equal(t, "replica-path", replicaPath) require.Len(t, logHook.AllEntries(), 1) @@ -185,11 +188,11 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // first access populates the cache replicaPath, path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages1) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, path1Storages1.Values()) require.Equal(t, "replica-path-1", replicaPath) replicaPath, path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages1) + require.ElementsMatch(t, []string{"g1", "g2"}, path2Storages1.Values()) require.Equal(t, "replica-path-2", replicaPath) // notification evicts entries for '/repo/path/2' from the cache @@ -203,12 +206,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // second access re-uses cached data for '/repo/path/1' replicaPath1, path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages2) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, path1Storages2.Values()) require.Equal(t, "replica-path-1", replicaPath1) // second access populates the cache again for '/repo/path/2' replicaPath, path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages2) + require.ElementsMatch(t, []string{"g1", "g2"}, path2Storages2.Values()) require.Equal(t, "replica-path-2", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` @@ -235,7 +238,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // first access populates the cache replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1.Values()) require.Equal(t, "replica-path", replicaPath) // disconnection disables cache @@ -244,7 +247,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { // second access retrieve data and doesn't populate the cache replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path") require.NoError(t, err) - require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2) + require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2.Values()) require.Equal(t, "replica-path", replicaPath) err = testutil.CollectAndCompare(cache, strings.NewReader(` @@ -308,6 +311,53 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) { close(start) wg.Wait() }) + + t.Run("concurrent access to different virtual storages", func(t *testing.T) { + db.TruncateAll(t) + ctx := testhelper.Context(t) + + storageCh := make(chan struct{}) + mockRepositoryStore := MockRepositoryStore{ + GetConsistentStoragesFunc: func(_ context.Context, virtualStorage string, _ string) (string, *datastructure.Set[string], error) { + switch virtualStorage { + case "storage-1": + storageCh <- struct{}{} + <-storageCh + return "", nil, nil + case "storage-2": + return "", nil, nil + default: + return "", nil, errors.New("unexpected storage") + } + }, + } + + cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), mockRepositoryStore, []string{"storage-1", "storage-2"}) + require.NoError(t, err) + cache.Connected() + + // Kick off a Goroutine that asks for a specific relative path on storage-1. This + // Goroutine will block until we signal it to leave via the storage channel. + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + _, _, err := cache.GetConsistentStorages(ctx, "storage-1", "path") + require.NoError(t, err) + }() + + // Synchronize with the Goroutine so that we know it's running. + <-storageCh + + // Retrieve consistent storages for the same path, but on a different virtual + // storage. The first query should not impact this. + _, _, err = cache.GetConsistentStorages(ctx, "storage-2", "path") + require.NoError(t, err) + + // Unblock the Goroutine and wait for it to exit. + storageCh <- struct{}{} + wg.Wait() + }) } func TestSyncer_await(t *testing.T) { diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go index a64bf8571..3fbc4c00f 100644 --- a/internal/praefect/nodes/manager.go +++ b/internal/praefect/nodes/manager.go @@ -12,6 +12,7 @@ import ( grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/sirupsen/logrus" gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" @@ -277,23 +278,23 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st return nil, err } - if len(upToDateStorages) == 0 { + if upToDateStorages == nil || upToDateStorages.IsEmpty() { // this possible when there is no data yet in the database for the repository shard, err := n.GetShard(ctx, virtualStorageName) if err != nil { return nil, fmt.Errorf("get shard for %q: %w", virtualStorageName, err) } - upToDateStorages = map[string]struct{}{shard.Primary.GetStorage(): {}} + upToDateStorages = datastructure.SetFromValues(shard.Primary.GetStorage()) } - healthyStorages := make([]Node, 0, len(upToDateStorages)) + healthyStorages := make([]Node, 0, upToDateStorages.Len()) for _, node := range n.Nodes()[virtualStorageName] { if !node.IsHealthy() { continue } - if _, ok := upToDateStorages[node.GetStorage()]; !ok { + if !upToDateStorages.HasValue(node.GetStorage()) { continue } diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go index 2cc02cb27..26ef54850 100644 --- a/internal/praefect/nodes/manager_test.go +++ b/internal/praefect/nodes/manager_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/client" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" @@ -321,12 +322,12 @@ func TestMgr_GetSyncedNode(t *testing.T) { ctx := testhelper.Context(t) var consistentSecondariesErr error - consistentStorages := map[string]struct{}{} + var consistentStorages *datastructure.Set[string] verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { conf.Failover.Enabled = failover rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { return relativePath, consistentStorages, consistentSecondariesErr }, } @@ -357,7 +358,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { })) t.Run("no up to date storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { - consistentStorages = nil + consistentStorages = datastructure.NewSet[string]() node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath) require.NoError(t, err) @@ -365,7 +366,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { })) t.Run("multiple storages up to date", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { - consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}, "gitaly-2": {}} + consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1", "gitaly-2") chosen := map[Node]struct{}{} for i := 0; i < 1000 && len(chosen) < 2; i++ { @@ -379,7 +380,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { })) t.Run("single secondary storage up to date but unhealthy", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { - consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}} + consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1") healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) @@ -399,7 +400,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { })) t.Run("no healthy storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { - consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}} + consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1") healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) @@ -426,7 +427,7 @@ func TestMgr_GetSyncedNode(t *testing.T) { })) t.Run("disabled failover doesn't disable health state", verify(false, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { - consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}} + consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1") shard, err := nm.GetShard(ctx, virtualStorage) require.NoError(t, err) diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go index 2f2a7a6e0..9880e9820 100644 --- a/internal/praefect/router_node_manager.go +++ b/internal/praefect/router_node_manager.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes" @@ -89,13 +90,13 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } - if len(consistentStorages) == 0 { + if consistentStorages == nil || consistentStorages.IsEmpty() { // if there is no up to date storages we'll have to consider the storage // up to date as this will be the case on repository creation - consistentStorages = map[string]struct{}{shard.Primary.GetStorage(): {}} + consistentStorages = datastructure.SetFromValues(shard.Primary.GetStorage()) } - if _, ok := consistentStorages[shard.Primary.GetStorage()]; !ok { + if !consistentStorages.HasValue(shard.Primary.GetStorage()) { return RepositoryMutatorRoute{}, ErrRepositoryReadOnly } @@ -104,9 +105,9 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS var replicationTargets []string // Only healthy secondaries which are consistent with the primary are allowed to take // part in the transaction. Unhealthy nodes would block the transaction until they come back. - participatingSecondaries := make([]nodes.Node, 0, len(consistentStorages)) + participatingSecondaries := make([]nodes.Node, 0, consistentStorages.Len()) for _, secondary := range shard.Secondaries { - if _, ok := consistentStorages[secondary.GetStorage()]; ok && secondary.IsHealthy() { + if consistentStorages.HasValue(secondary.GetStorage()) && secondary.IsHealthy() { participatingSecondaries = append(participatingSecondaries, secondary) continue } diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go index c64da8f7a..ca64366cf 100644 --- a/internal/praefect/router_per_repository.go +++ b/internal/praefect/router_per_repository.go @@ -176,7 +176,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu healthyConsistentNodes := make([]RouterNode, 0, len(healthyNodes)) for _, node := range healthyNodes { - if _, ok := consistentStorages[node.Storage]; !ok { + if !consistentStorages.HasValue(node.Storage) { continue } @@ -243,7 +243,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err) } - if _, ok := consistentStorages[primary]; !ok { + if !consistentStorages.HasValue(primary) { return RepositoryMutatorRoute{}, ErrRepositoryReadOnly } @@ -264,7 +264,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua continue } - if _, consistent := consistentStorages[node.Storage]; !consistent || !healthy { + if !consistentStorages.HasValue(node.Storage) || !healthy { route.ReplicationTargets = append(route.ReplicationTargets, assigned) continue } diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 93e3f553d..af34fa48f 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v15/internal/backchannel" + "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure" "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" gconfig "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service" @@ -548,8 +549,8 @@ func TestRemoveRepository(t *testing.T) { cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{ WithQueue: queueInterceptor, WithRepoStore: datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, nil, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + return relativePath, datastructure.NewSet[string](), nil }, }, WithNodeMgr: nodeMgr, @@ -825,8 +826,8 @@ func TestProxyWrites(t *testing.T) { }) rs := datastore.MockRepositoryStore{ - GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) { - return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil + GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) { + return relativePath, datastructure.SetFromValues(cfg0.Storages[0].Name, cfg1.Storages[0].Name, cfg2.Storages[0].Name), nil }, } |