Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorToon Claes <toon@gitlab.com>2023-05-09 20:34:51 +0300
committerToon Claes <toon@gitlab.com>2023-05-09 20:34:51 +0300
commite12daecd74f27092529134ebf40c32a468b6d00a (patch)
tree77647e03d01f38d0253377dc3c68ab8689e935a7
parent319f059397094c4e2c0542309218e64cfd2a1a98 (diff)
parent7aac7784034000876cf9031211b51ed8872dd5ad (diff)
Merge branch 'pks-datastore-refactor-consistent-storages-cache' into 'master'
datastore: Refactor consistent storages cache See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5729 Merged-by: Toon Claes <toon@gitlab.com> Approved-by: Toon Claes <toon@gitlab.com> Reviewed-by: Toon Claes <toon@gitlab.com> Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Co-authored-by: Patrick Steinhardt <psteinhardt@gitlab.com>
-rw-r--r--internal/datastructure/set.go98
-rw-r--r--internal/datastructure/set_test.go157
-rw-r--r--internal/datastructure/testhelper_test.go11
-rw-r--r--internal/praefect/coordinator_test.go27
-rw-r--r--internal/praefect/datastore/repository_store.go17
-rw-r--r--internal/praefect/datastore/repository_store_mock.go18
-rw-r--r--internal/praefect/datastore/repository_store_test.go24
-rw-r--r--internal/praefect/datastore/storage_provider.go120
-rw-r--r--internal/praefect/datastore/storage_provider_test.go76
-rw-r--r--internal/praefect/nodes/manager.go9
-rw-r--r--internal/praefect/nodes/manager_test.go15
-rw-r--r--internal/praefect/router_node_manager.go11
-rw-r--r--internal/praefect/router_per_repository.go6
-rw-r--r--internal/praefect/server_test.go9
14 files changed, 451 insertions, 147 deletions
diff --git a/internal/datastructure/set.go b/internal/datastructure/set.go
new file mode 100644
index 000000000..966d8b8ef
--- /dev/null
+++ b/internal/datastructure/set.go
@@ -0,0 +1,98 @@
+package datastructure
+
+// Set is a data structure that contains unique values. This data structure is not safe for
+// concurrent accesses.
+type Set[Value comparable] struct {
+ values map[Value]struct{}
+}
+
+// NewSet returns a new ready-to-use set.
+func NewSet[Value comparable]() *Set[Value] {
+ return SetFromSlice[Value](nil)
+}
+
+// SetFromValues creates a new set that is populated with the provided values. The values will be
+// deduplicated. This can be more efficient than manually creating and populating the set with
+// values as the underlying data structure will already be preallocated.
+func SetFromValues[Value comparable](values ...Value) *Set[Value] {
+ return SetFromSlice(values)
+}
+
+// SetFromSlice creates a new set that is prepopulated with the values from the given slice. The
+// slice values will be deduplicated. This can be more efficient than manually creating and
+// populating the set with values as the underlying data structure will already be preallocated.
+func SetFromSlice[Value comparable](slice []Value) *Set[Value] {
+ values := make(map[Value]struct{}, len(slice))
+ for _, value := range slice {
+ values[value] = struct{}{}
+ }
+
+ return &Set[Value]{
+ values: values,
+ }
+}
+
+// Add adds the given value to the set. Returns `true` if the value was added, and `false` in case
+// no change was made to the set.
+func (s *Set[Value]) Add(v Value) bool {
+ if _, exists := s.values[v]; !exists {
+ s.values[v] = struct{}{}
+ return true
+ }
+
+ return false
+}
+
+// Remove removes the given value from the set. Returns `true` if the value was removed, and `false`
+// in case no change was made to the set.
+func (s *Set[Value]) Remove(v Value) bool {
+ if _, exists := s.values[v]; exists {
+ delete(s.values, v)
+ return true
+ }
+
+ return false
+}
+
+// HasValue determines whether the given value is contained in the set.
+func (s *Set[Value]) HasValue(v Value) bool {
+ _, exists := s.values[v]
+ return exists
+}
+
+// Len returns the number of entries contained in the set.
+func (s *Set[Value]) Len() int {
+ return len(s.values)
+}
+
+// IsEmpty determines whether the set is empty.
+func (s *Set[Value]) IsEmpty() bool {
+ return s.Len() == 0
+}
+
+// Equal determines whether the set is equal to another set. Two sets are equal when they contain
+// the exact same values.
+func (s *Set[Value]) Equal(o *Set[Value]) bool {
+ if s.Len() != o.Len() {
+ return false
+ }
+
+ for _, value := range s.Values() {
+ if !o.HasValue(value) {
+ return false
+ }
+ }
+
+ return true
+}
+
+// Values returns an array of all values that have been added to the set. The array is newly
+// allocated and can be modified without affecting the set itself. The order of values is not
+// guaranteed.
+func (s *Set[Value]) Values() []Value {
+ values := make([]Value, 0, len(s.values))
+ for value := range s.values {
+ values = append(values, value)
+ }
+ return values
+}
diff --git a/internal/datastructure/set_test.go b/internal/datastructure/set_test.go
new file mode 100644
index 000000000..dc60613a3
--- /dev/null
+++ b/internal/datastructure/set_test.go
@@ -0,0 +1,157 @@
+package datastructure
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestSet(t *testing.T) {
+ t.Parallel()
+
+ t.Run("new set is empty", func(t *testing.T) {
+ s := NewSet[bool]()
+ requireSet(t, s, []bool{})
+ })
+
+ t.Run("set from slice contains values", func(t *testing.T) {
+ s := SetFromSlice([]int{1, 3, 5})
+ requireSet(t, s, []int{1, 3, 5})
+ })
+
+ t.Run("set from slice deduplicates values", func(t *testing.T) {
+ s := SetFromSlice([]int{1, 1, 3, 5, 5})
+ requireSet(t, s, []int{1, 3, 5})
+ })
+
+ t.Run("set from values contains values", func(t *testing.T) {
+ s := SetFromValues(1, 3, 5)
+ requireSet(t, s, []int{1, 3, 5})
+ })
+
+ t.Run("set from values deduplicates values", func(t *testing.T) {
+ s := SetFromValues(1, 1, 3, 5, 5)
+ requireSet(t, s, []int{1, 3, 5})
+ })
+
+ t.Run("add value", func(t *testing.T) {
+ s := NewSet[bool]()
+ require.True(t, s.Add(true))
+
+ requireSet(t, s, []bool{true})
+ require.False(t, s.HasValue(false))
+ })
+
+ t.Run("remove value", func(t *testing.T) {
+ s := NewSet[bool]()
+ require.True(t, s.Add(true))
+ require.True(t, s.Remove(true))
+
+ requireSet(t, s, []bool{})
+ })
+
+ t.Run("adding multiple values", func(t *testing.T) {
+ s := NewSet[int]()
+ for i := 0; i < 5; i++ {
+ require.True(t, s.Add(i))
+ }
+
+ requireSet(t, s, []int{0, 1, 2, 3, 4})
+ })
+
+ t.Run("adding and removing multiple values", func(t *testing.T) {
+ s := NewSet[int]()
+ for i := 0; i < 10; i++ {
+ require.True(t, s.Add(i))
+ }
+ for i := 0; i < 5; i++ {
+ require.True(t, s.Remove(i*2))
+ }
+
+ requireSet(t, s, []int{1, 3, 5, 7, 9})
+ })
+
+ t.Run("re-adding values", func(t *testing.T) {
+ s := NewSet[int]()
+
+ require.True(t, s.Add(1))
+ require.False(t, s.Add(1))
+
+ requireSet(t, s, []int{1})
+ })
+
+ t.Run("removing nonexistent value", func(t *testing.T) {
+ s := NewSet[int]()
+
+ require.True(t, s.Add(1))
+ require.False(t, s.Remove(0))
+
+ requireSet(t, s, []int{1})
+ })
+}
+
+func TestSet_empty(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ a, b *Set[int]
+ expectEqual bool
+ }{
+ {
+ desc: "empty sets",
+ a: NewSet[int](),
+ b: NewSet[int](),
+ expectEqual: true,
+ },
+ {
+ desc: "empty and non-empty set",
+ a: NewSet[int](),
+ b: SetFromValues(1),
+ expectEqual: false,
+ },
+ {
+ desc: "disjunct sets",
+ a: SetFromValues(2),
+ b: SetFromValues(1),
+ expectEqual: false,
+ },
+ {
+ desc: "partially disjunct sets",
+ a: SetFromValues(1, 2),
+ b: SetFromValues(1, 3),
+ expectEqual: false,
+ },
+ {
+ desc: "subset",
+ a: SetFromValues(1),
+ b: SetFromValues(1, 3),
+ expectEqual: false,
+ },
+ {
+ desc: "superset",
+ a: SetFromValues(1, 3),
+ b: SetFromValues(1),
+ expectEqual: false,
+ },
+ {
+ desc: "same values",
+ a: SetFromValues(1, 2, 3),
+ b: SetFromValues(1, 2, 3),
+ expectEqual: true,
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ require.Equal(t, tc.expectEqual, tc.a.Equal(tc.b))
+ })
+ }
+}
+
+func requireSet[Value comparable](t *testing.T, s *Set[Value], expectedValues []Value) {
+ t.Helper()
+
+ require.Equal(t, len(expectedValues), s.Len())
+ require.ElementsMatch(t, expectedValues, s.Values())
+ require.Equal(t, len(expectedValues) == 0, s.IsEmpty())
+ require.True(t, s.Equal(SetFromSlice(expectedValues)))
+}
diff --git a/internal/datastructure/testhelper_test.go b/internal/datastructure/testhelper_test.go
new file mode 100644
index 000000000..4aa3a317f
--- /dev/null
+++ b/internal/datastructure/testhelper_test.go
@@ -0,0 +1,11 @@
+package datastructure
+
+import (
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ testhelper.Run(m)
+}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 71a96ba53..4d6ef6867 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/cache"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
gconfig "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service"
@@ -94,11 +95,11 @@ func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
ctx := testhelper.Context(t)
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(context.Context, string, string) (string, map[string]struct{}, error) {
+ GetConsistentStoragesFunc: func(context.Context, string, string) (string, *datastructure.Set[string], error) {
if tc.readOnly {
- return "", map[string]struct{}{storage + "-other": {}}, nil
+ return "", datastructure.SetFromValues(storage + "-other"), nil
}
- return "", map[string]struct{}{storage: {}}, nil
+ return "", datastructure.NewSet[string](), nil
},
}
@@ -483,8 +484,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
}
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return relativePath, map[string]struct{}{"primary": {}, "secondary": {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ return relativePath, datastructure.SetFromValues("primary", "secondary"), nil
},
}
@@ -595,8 +596,8 @@ func TestStreamDirectorMutator_SecondaryErrorHandling(t *testing.T) {
}
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return relativePath, map[string]struct{}{"praefect-internal-1": {}, "praefect-internal-2": {}, "praefect-internal-3": {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ return relativePath, datastructure.SetFromValues("praefect-internal-1", "praefect-internal-2", "praefect-internal-3"), nil
},
}
@@ -681,8 +682,8 @@ func TestStreamDirectorMutator_ReplicateRepository(t *testing.T) {
incrementGenerationInvoked := false
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return relativePath, map[string]struct{}{"praefect-internal-2": {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ return relativePath, datastructure.SetFromValues("praefect-internal-2"), nil
},
CreateRepositoryFunc: func(ctx context.Context, repositoryID int64, virtualStorage, relativePath, replicaPath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
require.Fail(t, "CreateRepository should not be called")
@@ -1172,8 +1173,8 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
entry := testhelper.NewDiscardingLogEntry(t)
repoStore := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return relativePath, map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ return relativePath, datastructure.SetFromValues(primaryNodeConf.Storage, secondaryNodeConf.Storage), nil
},
}
@@ -2186,8 +2187,8 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) {
return repoProto.GetRelativePath(), nil
},
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return relativePath, map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ return relativePath, datastructure.SetFromValues("primary", "secondary-1", "secondary-2"), nil
},
},
})
diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go
index 5a317c689..8abcf945e 100644
--- a/internal/praefect/datastore/repository_store.go
+++ b/internal/praefect/datastore/repository_store.go
@@ -9,6 +9,7 @@ import (
"strings"
"time"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
)
@@ -125,7 +126,7 @@ type RepositoryStore interface {
// RenameRepository which can be removed in a later release.
RenameRepositoryInPlace(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error
// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
- GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
+ GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
ConsistentStoragesGetter
// RepositoryExists returns whether the repository exists on a virtual storage.
RepositoryExists(ctx context.Context, virtualStorage, relativePath string) (bool, error)
@@ -641,7 +642,7 @@ AND storage = $3
}
// GetConsistentStoragesByRepositoryID returns the replica path and the set of up to date storages for the given repository keyed by repository ID.
-func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) {
+func (rs *PostgresRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) {
return rs.getConsistentStorages(ctx, `
SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
@@ -652,7 +653,7 @@ GROUP BY replica_path
}
// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
-func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+func (rs *PostgresRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
replicaPath, storages, err := rs.getConsistentStorages(ctx, `
SELECT replica_path, ARRAY_AGG(storage)
FROM repositories
@@ -669,7 +670,7 @@ GROUP BY replica_path
}
// getConsistentStorages is a helper for querying the consistent storages by different keys.
-func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, map[string]struct{}, error) {
+func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, query string, params ...interface{}) (string, *datastructure.Set[string], error) {
var replicaPath string
var storages glsql.StringArray
@@ -681,13 +682,7 @@ func (rs *PostgresRepositoryStore) getConsistentStorages(ctx context.Context, qu
return "", nil, fmt.Errorf("query: %w", err)
}
- result := storages.Slice()
- consistentStorages := make(map[string]struct{}, len(result))
- for _, storage := range result {
- consistentStorages[storage] = struct{}{}
- }
-
- return replicaPath, consistentStorages, nil
+ return replicaPath, datastructure.SetFromSlice(storages.Slice()), nil
}
//nolint:revive // This is unintentionally missing documentation.
diff --git a/internal/praefect/datastore/repository_store_mock.go b/internal/praefect/datastore/repository_store_mock.go
index 5705cd9a2..c566d432c 100644
--- a/internal/praefect/datastore/repository_store_mock.go
+++ b/internal/praefect/datastore/repository_store_mock.go
@@ -1,6 +1,10 @@
package datastore
-import "context"
+import (
+ "context"
+
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
+)
// MockRepositoryStore allows for mocking a RepositoryStore by parametrizing its behavior. All methods
// default to what could be considered success if not set.
@@ -16,8 +20,8 @@ type MockRepositoryStore struct {
DeleteReplicaFunc func(ctx context.Context, repositoryID int64, storage string) error
RenameRepositoryInPlaceFunc func(ctx context.Context, virtualStorage, relativePath, newRelativePath string) error
RenameRepositoryFunc func(ctx context.Context, virtualStorage, relativePath, storage, newRelativePath string) error
- GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error)
- GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
+ GetConsistentStoragesByRepositoryIDFunc func(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error)
+ GetConsistentStoragesFunc func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
GetPartiallyAvailableRepositoriesFunc func(ctx context.Context, virtualStorage string) ([]RepositoryMetadata, error)
DeleteInvalidRepositoryFunc func(ctx context.Context, repositoryID int64, storage string) error
RepositoryExistsFunc func(ctx context.Context, virtualStorage, relativePath string) (bool, error)
@@ -115,18 +119,18 @@ func (m MockRepositoryStore) RenameRepository(ctx context.Context, virtualStorag
}
// GetConsistentStoragesByRepositoryID returns result of execution of the GetConsistentStoragesByRepositoryIDFunc field if it is set or an empty map.
-func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, map[string]struct{}, error) {
+func (m MockRepositoryStore) GetConsistentStoragesByRepositoryID(ctx context.Context, repositoryID int64) (string, *datastructure.Set[string], error) {
if m.GetConsistentStoragesFunc == nil {
- return "", map[string]struct{}{}, nil
+ return "", datastructure.NewSet[string](), nil
}
return m.GetConsistentStoragesByRepositoryIDFunc(ctx, repositoryID)
}
// GetConsistentStorages returns result of execution of the GetConsistentStoragesFunc field if it is set or an empty map.
-func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+func (m MockRepositoryStore) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
if m.GetConsistentStoragesFunc == nil {
- return "", map[string]struct{}{}, nil
+ return "", datastructure.NewSet[string](), nil
}
return m.GetConsistentStoragesFunc(ctx, virtualStorage, relativePath)
diff --git a/internal/praefect/datastore/repository_store_test.go b/internal/praefect/datastore/repository_store_test.go
index d1d3f0204..c73f0c2ea 100644
--- a/internal/praefect/datastore/repository_store_test.go
+++ b/internal/praefect/datastore/repository_store_test.go
@@ -1042,12 +1042,12 @@ func TestRepositoryStore_Postgres(t *testing.T) {
t.Run("consistent secondary", func(t *testing.T) {
replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries)
+ require.ElementsMatch(t, []string{"primary", "consistent-secondary"}, secondaries.Values())
require.Equal(t, "replica-path", replicaPath)
replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"primary": {}, "consistent-secondary": {}}, secondaries)
+ require.ElementsMatch(t, []string{"primary", "consistent-secondary"}, secondaries.Values())
require.Equal(t, "replica-path", replicaPath)
})
@@ -1056,12 +1056,12 @@ func TestRepositoryStore_Postgres(t *testing.T) {
t.Run("outdated primary", func(t *testing.T) {
replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries)
+ require.Equal(t, []string{"consistent-secondary"}, secondaries.Values())
require.Equal(t, "replica-path", replicaPath)
replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"consistent-secondary": {}}, secondaries)
+ require.Equal(t, []string{"consistent-secondary"}, secondaries.Values())
require.Equal(t, "replica-path", replicaPath)
})
@@ -1088,12 +1088,12 @@ func TestRepositoryStore_Postgres(t *testing.T) {
replicaPath, secondaries, err := rs.GetConsistentStorages(ctx, vs, repo)
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries)
+ require.Equal(t, []string{"unknown"}, secondaries.Values())
require.Equal(t, "replica-path", replicaPath)
replicaPath, secondaries, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"unknown": {}}, secondaries)
+ require.Equal(t, []string{"unknown"}, secondaries.Values())
require.Equal(t, "replica-path", replicaPath)
})
@@ -1120,31 +1120,31 @@ func TestRepositoryStore_Postgres(t *testing.T) {
replicaPath, storages, err := rs.GetConsistentStorages(ctx, vs, "original-path")
require.NoError(t, err)
require.Equal(t, "replica-path", replicaPath)
- require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages)
+ require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values())
replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, "replica-path", replicaPath)
- require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages)
+ require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values())
require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-1", "new-path"))
replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path")
require.NoError(t, err)
require.Equal(t, "new-path", replicaPath)
- require.Equal(t, map[string]struct{}{"storage-1": {}}, storages)
+ require.Equal(t, []string{"storage-1"}, storages.Values())
replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, "new-path", replicaPath)
- require.Equal(t, map[string]struct{}{"storage-1": {}}, storages)
+ require.Equal(t, []string{"storage-1"}, storages.Values())
require.NoError(t, rs.RenameRepository(ctx, vs, "original-path", "storage-2", "new-path"))
replicaPath, storages, err = rs.GetConsistentStorages(ctx, vs, "new-path")
require.NoError(t, err)
require.Equal(t, "new-path", replicaPath)
- require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages)
+ require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values())
replicaPath, storages, err = rs.GetConsistentStoragesByRepositoryID(ctx, 1)
require.NoError(t, err)
require.Equal(t, "new-path", replicaPath)
- require.Equal(t, map[string]struct{}{"storage-1": {}, "storage-2": {}}, storages)
+ require.ElementsMatch(t, []string{"storage-1", "storage-2"}, storages.Values())
})
})
diff --git a/internal/praefect/datastore/storage_provider.go b/internal/praefect/datastore/storage_provider.go
index a7609c7a7..949fb246d 100644
--- a/internal/praefect/datastore/storage_provider.go
+++ b/internal/praefect/datastore/storage_provider.go
@@ -11,28 +11,38 @@ import (
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
)
// ConsistentStoragesGetter returns storages which contain the latest generation of a repository.
type ConsistentStoragesGetter interface {
// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
- GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error)
+ GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error)
}
// errNotExistingVirtualStorage indicates that the requested virtual storage can't be found or not configured.
var errNotExistingVirtualStorage = errors.New("virtual storage does not exist")
+type cachedReplicaInfo struct {
+ replicaPath string
+ storages *datastructure.Set[string]
+}
+
+type virtualStorageCache struct {
+ syncer syncer
+ replicaInfoCache *lru.Cache[string, cachedReplicaInfo]
+}
+
// CachingConsistentStoragesGetter is a ConsistentStoragesGetter that caches up to date storages by repository.
// Each virtual storage has it's own cache that invalidates entries based on notifications.
type CachingConsistentStoragesGetter struct {
csg ConsistentStoragesGetter
- // caches is per virtual storage cache. It is initialized once on construction.
- caches map[string]*lru.Cache[string, interface{}]
+ // caches is per virtual storage cache. The caches themselves contain information about
+ // replicas keyed by their respective relative paths.
+ caches map[string]*virtualStorageCache
// access is access method to use: 0 - without caching; 1 - with caching.
access int32
- // syncer allows to sync retrieval operations to omit unnecessary runs.
- syncer syncer
// callbackLogger should be used only inside of the methods used as callbacks.
callbackLogger logrus.FieldLogger
cacheAccessTotal *prometheus.CounterVec
@@ -42,8 +52,7 @@ type CachingConsistentStoragesGetter struct {
func NewCachingConsistentStoragesGetter(logger logrus.FieldLogger, csg ConsistentStoragesGetter, virtualStorages []string) (*CachingConsistentStoragesGetter, error) {
cached := &CachingConsistentStoragesGetter{
csg: csg,
- caches: make(map[string]*lru.Cache[string, interface{}], len(virtualStorages)),
- syncer: syncer{inflight: map[string]chan struct{}{}},
+ caches: make(map[string]*virtualStorageCache, len(virtualStorages)),
callbackLogger: logger.WithField("component", "caching_storage_provider"),
cacheAccessTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
@@ -56,13 +65,16 @@ func NewCachingConsistentStoragesGetter(logger logrus.FieldLogger, csg Consisten
for _, virtualStorage := range virtualStorages {
virtualStorage := virtualStorage
- cache, err := lru.NewWithEvict(2<<20, func(key string, value interface{}) {
+ replicaInfoCache, err := lru.NewWithEvict(2<<20, func(key string, value cachedReplicaInfo) {
cached.cacheAccessTotal.WithLabelValues(virtualStorage, "evict").Inc()
})
if err != nil {
return nil, err
}
- cached.caches[virtualStorage] = cache
+ cached.caches[virtualStorage] = &virtualStorageCache{
+ syncer: syncer{inflight: map[string]chan struct{}{}},
+ replicaInfoCache: replicaInfoCache,
+ }
}
return cached, nil
@@ -83,14 +95,14 @@ func (c *CachingConsistentStoragesGetter) Notification(n glsql.Notification) {
}
for _, entry := range changes {
- cache, found := c.getCache(entry.VirtualStorage)
+ cache, found := c.caches[entry.VirtualStorage]
if !found {
c.callbackLogger.WithError(errNotExistingVirtualStorage).WithField("virtual_storage", entry.VirtualStorage).Error("cache not found")
continue
}
for _, relativePath := range entry.RelativePaths {
- cache.Remove(relativePath)
+ cache.replicaInfoCache.Remove(relativePath)
}
}
}
@@ -124,40 +136,8 @@ func (c *CachingConsistentStoragesGetter) disableCaching() {
atomic.StoreInt32(&c.access, 0)
for _, cache := range c.caches {
- cache.Purge()
- }
-}
-
-func (c *CachingConsistentStoragesGetter) getCache(virtualStorage string) (*lru.Cache[string, interface{}], bool) {
- val, found := c.caches[virtualStorage]
- return val, found
-}
-
-func (c *CachingConsistentStoragesGetter) cacheMiss(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc()
- return c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath)
-}
-
-func (c *CachingConsistentStoragesGetter) tryCache(virtualStorage, relativePath string) (func(), *lru.Cache[string, interface{}], cacheValue, bool) {
- populateDone := func() {} // should be called AFTER any cache population is done
-
- cache, found := c.getCache(virtualStorage)
- if !found {
- return populateDone, nil, cacheValue{}, false
- }
-
- if storages, found := getKey(cache, relativePath); found {
- return populateDone, cache, storages, true
- }
-
- // synchronises concurrent attempts to update cache for the same key.
- populateDone = c.syncer.await(relativePath)
-
- if storages, found := getKey(cache, relativePath); found {
- return populateDone, cache, storages, true
+ cache.replicaInfoCache.Purge()
}
-
- return populateDone, cache, cacheValue{}, false
}
func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool {
@@ -165,39 +145,43 @@ func (c *CachingConsistentStoragesGetter) isCacheEnabled() bool {
}
// GetConsistentStorages returns the replica path and the set of up to date storages for the given repository keyed by virtual storage and relative path.
-func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- var cache *lru.Cache[string, interface{}]
+func (c *CachingConsistentStoragesGetter) GetConsistentStorages(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ cache, hasCache := c.caches[virtualStorage]
+ if hasCache && c.isCacheEnabled() {
+ if replicaInfo, found := cache.replicaInfoCache.Get(relativePath); found {
+ c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc()
+ return replicaInfo.replicaPath, replicaInfo.storages, nil
+ }
- if c.isCacheEnabled() {
- var value cacheValue
- var ok bool
- var populationDone func()
+ // Synchronise concurrent attempts to update the cache for the same relative path.
+ // This will cause us to wait for any ongoing calls, but also locks out other new
+ // callers so that we can racelessly populate the cache. The deferred call will then
+ // unlock other callers again once we're done with the lookup.
+ defer cache.syncer.await(relativePath)()
- populationDone, cache, value, ok = c.tryCache(virtualStorage, relativePath)
- defer populationDone()
- if ok {
+ // We re-try whether the cache has been populated now via any concurrent Goroutine.
+ // If so, we return the newly populated entry.
+ if replicaInfo, found := cache.replicaInfoCache.Get(relativePath); found {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "hit").Inc()
- return value.replicaPath, value.storages, nil
+ return replicaInfo.replicaPath, replicaInfo.storages, nil
}
+ } else {
+ // Unset the cache so that we don't try to populate it when it is disabled.
+ cache = nil
}
- replicaPath, storages, err := c.cacheMiss(ctx, virtualStorage, relativePath)
- if err == nil && cache != nil {
- cache.Add(relativePath, cacheValue{replicaPath: replicaPath, storages: storages})
+ c.cacheAccessTotal.WithLabelValues(virtualStorage, "miss").Inc()
+
+ replicaPath, storages, err := c.csg.GetConsistentStorages(ctx, virtualStorage, relativePath)
+ if err != nil {
+ return "", nil, err
+ }
+ if cache != nil {
c.cacheAccessTotal.WithLabelValues(virtualStorage, "populate").Inc()
+ cache.replicaInfoCache.Add(relativePath, cachedReplicaInfo{replicaPath: replicaPath, storages: storages})
}
- return replicaPath, storages, err
-}
-type cacheValue struct {
- replicaPath string
- storages map[string]struct{}
-}
-
-func getKey(cache *lru.Cache[string, interface{}], key string) (cacheValue, bool) {
- val, found := cache.Get(key)
- vals, _ := val.(cacheValue)
- return vals, found
+ return replicaPath, storages, err
}
// syncer allows to sync access to a particular key.
diff --git a/internal/praefect/datastore/storage_provider_test.go b/internal/praefect/datastore/storage_provider_test.go
index 2b86cb213..98e54d2a6 100644
--- a/internal/praefect/datastore/storage_provider_test.go
+++ b/internal/praefect/datastore/storage_provider_test.go
@@ -1,7 +1,9 @@
package datastore
import (
+ "context"
"encoding/json"
+ "errors"
"runtime"
"strings"
"sync"
@@ -13,6 +15,7 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
@@ -37,7 +40,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// empty cache should be populated
replicaPath, storages, err := cache.GetConsistentStorages(ctx, "unknown", "/repo/path")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages.Values())
require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -61,7 +64,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// empty cache should be populated
replicaPath, storages, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages.Values())
require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -75,7 +78,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// populated cache should return cached value
replicaPath, storages, err = cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages.Values())
require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -126,7 +129,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// first access populates the cache
replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1.Values())
require.Equal(t, "replica-path", replicaPath)
// invalid payload disables caching
@@ -137,19 +140,19 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// second access omits cached data as caching should be disabled
replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2.Values())
require.Equal(t, "replica-path", replicaPath)
// third access retrieves data and caches it
replicaPath, storages3, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages3)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages3.Values())
require.Equal(t, "replica-path", replicaPath)
// fourth access retrieves data from cache
replicaPath, storages4, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages4)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages4.Values())
require.Equal(t, "replica-path", replicaPath)
require.Len(t, logHook.AllEntries(), 1)
@@ -185,11 +188,11 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// first access populates the cache
replicaPath, path1Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages1)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, path1Storages1.Values())
require.Equal(t, "replica-path-1", replicaPath)
replicaPath, path2Storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages1)
+ require.ElementsMatch(t, []string{"g1", "g2"}, path2Storages1.Values())
require.Equal(t, "replica-path-2", replicaPath)
// notification evicts entries for '/repo/path/2' from the cache
@@ -203,12 +206,12 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// second access re-uses cached data for '/repo/path/1'
replicaPath1, path1Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/1")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, path1Storages2)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, path1Storages2.Values())
require.Equal(t, "replica-path-1", replicaPath1)
// second access populates the cache again for '/repo/path/2'
replicaPath, path2Storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path/2")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}}, path2Storages2)
+ require.ElementsMatch(t, []string{"g1", "g2"}, path2Storages2.Values())
require.Equal(t, "replica-path-2", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -235,7 +238,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// first access populates the cache
replicaPath, storages1, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages1)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages1.Values())
require.Equal(t, "replica-path", replicaPath)
// disconnection disables cache
@@ -244,7 +247,7 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
// second access retrieve data and doesn't populate the cache
replicaPath, storages2, err := cache.GetConsistentStorages(ctx, "vs", "/repo/path")
require.NoError(t, err)
- require.Equal(t, map[string]struct{}{"g1": {}, "g2": {}, "g3": {}}, storages2)
+ require.ElementsMatch(t, []string{"g1", "g2", "g3"}, storages2.Values())
require.Equal(t, "replica-path", replicaPath)
err = testutil.CollectAndCompare(cache, strings.NewReader(`
@@ -308,6 +311,53 @@ func TestCachingStorageProvider_GetSyncedNodes(t *testing.T) {
close(start)
wg.Wait()
})
+
+ t.Run("concurrent access to different virtual storages", func(t *testing.T) {
+ db.TruncateAll(t)
+ ctx := testhelper.Context(t)
+
+ storageCh := make(chan struct{})
+ mockRepositoryStore := MockRepositoryStore{
+ GetConsistentStoragesFunc: func(_ context.Context, virtualStorage string, _ string) (string, *datastructure.Set[string], error) {
+ switch virtualStorage {
+ case "storage-1":
+ storageCh <- struct{}{}
+ <-storageCh
+ return "", nil, nil
+ case "storage-2":
+ return "", nil, nil
+ default:
+ return "", nil, errors.New("unexpected storage")
+ }
+ },
+ }
+
+ cache, err := NewCachingConsistentStoragesGetter(ctxlogrus.Extract(ctx), mockRepositoryStore, []string{"storage-1", "storage-2"})
+ require.NoError(t, err)
+ cache.Connected()
+
+ // Kick off a Goroutine that asks for a specific relative path on storage-1. This
+ // Goroutine will block until we signal it to leave via the storage channel.
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ _, _, err := cache.GetConsistentStorages(ctx, "storage-1", "path")
+ require.NoError(t, err)
+ }()
+
+ // Synchronize with the Goroutine so that we know it's running.
+ <-storageCh
+
+ // Retrieve consistent storages for the same path, but on a different virtual
+ // storage. The first query should not impact this.
+ _, _, err = cache.GetConsistentStorages(ctx, "storage-2", "path")
+ require.NoError(t, err)
+
+ // Unblock the Goroutine and wait for it to exit.
+ storageCh <- struct{}{}
+ wg.Wait()
+ })
}
func TestSyncer_await(t *testing.T) {
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index a64bf8571..3fbc4c00f 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -12,6 +12,7 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/sirupsen/logrus"
gitalyauth "gitlab.com/gitlab-org/gitaly/v15/auth"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/client"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
@@ -277,23 +278,23 @@ func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath st
return nil, err
}
- if len(upToDateStorages) == 0 {
+ if upToDateStorages == nil || upToDateStorages.IsEmpty() {
// this possible when there is no data yet in the database for the repository
shard, err := n.GetShard(ctx, virtualStorageName)
if err != nil {
return nil, fmt.Errorf("get shard for %q: %w", virtualStorageName, err)
}
- upToDateStorages = map[string]struct{}{shard.Primary.GetStorage(): {}}
+ upToDateStorages = datastructure.SetFromValues(shard.Primary.GetStorage())
}
- healthyStorages := make([]Node, 0, len(upToDateStorages))
+ healthyStorages := make([]Node, 0, upToDateStorages.Len())
for _, node := range n.Nodes()[virtualStorageName] {
if !node.IsHealthy() {
continue
}
- if _, ok := upToDateStorages[node.GetStorage()]; !ok {
+ if !upToDateStorages.HasValue(node.GetStorage()) {
continue
}
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 2cc02cb27..26ef54850 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/client"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
@@ -321,12 +322,12 @@ func TestMgr_GetSyncedNode(t *testing.T) {
ctx := testhelper.Context(t)
var consistentSecondariesErr error
- consistentStorages := map[string]struct{}{}
+ var consistentStorages *datastructure.Set[string]
verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) {
conf.Failover.Enabled = failover
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
return relativePath, consistentStorages, consistentSecondariesErr
},
}
@@ -357,7 +358,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
}))
t.Run("no up to date storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
- consistentStorages = nil
+ consistentStorages = datastructure.NewSet[string]()
node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath)
require.NoError(t, err)
@@ -365,7 +366,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
}))
t.Run("multiple storages up to date", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
- consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}, "gitaly-2": {}}
+ consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1", "gitaly-2")
chosen := map[Node]struct{}{}
for i := 0; i < 1000 && len(chosen) < 2; i++ {
@@ -379,7 +380,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
}))
t.Run("single secondary storage up to date but unhealthy", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
- consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}}
+ consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1")
healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
@@ -399,7 +400,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
}))
t.Run("no healthy storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
- consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}}
+ consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1")
healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
@@ -426,7 +427,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
}))
t.Run("disabled failover doesn't disable health state", verify(false, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
- consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}}
+ consistentStorages = datastructure.SetFromValues("gitaly-0", "gitaly-1")
shard, err := nm.GetShard(ctx, virtualStorage)
require.NoError(t, err)
diff --git a/internal/praefect/router_node_manager.go b/internal/praefect/router_node_manager.go
index 2f2a7a6e0..9880e9820 100644
--- a/internal/praefect/router_node_manager.go
+++ b/internal/praefect/router_node_manager.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
@@ -89,13 +90,13 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS
return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err)
}
- if len(consistentStorages) == 0 {
+ if consistentStorages == nil || consistentStorages.IsEmpty() {
// if there is no up to date storages we'll have to consider the storage
// up to date as this will be the case on repository creation
- consistentStorages = map[string]struct{}{shard.Primary.GetStorage(): {}}
+ consistentStorages = datastructure.SetFromValues(shard.Primary.GetStorage())
}
- if _, ok := consistentStorages[shard.Primary.GetStorage()]; !ok {
+ if !consistentStorages.HasValue(shard.Primary.GetStorage()) {
return RepositoryMutatorRoute{}, ErrRepositoryReadOnly
}
@@ -104,9 +105,9 @@ func (r *nodeManagerRouter) RouteRepositoryMutator(ctx context.Context, virtualS
var replicationTargets []string
// Only healthy secondaries which are consistent with the primary are allowed to take
// part in the transaction. Unhealthy nodes would block the transaction until they come back.
- participatingSecondaries := make([]nodes.Node, 0, len(consistentStorages))
+ participatingSecondaries := make([]nodes.Node, 0, consistentStorages.Len())
for _, secondary := range shard.Secondaries {
- if _, ok := consistentStorages[secondary.GetStorage()]; ok && secondary.IsHealthy() {
+ if consistentStorages.HasValue(secondary.GetStorage()) && secondary.IsHealthy() {
participatingSecondaries = append(participatingSecondaries, secondary)
continue
}
diff --git a/internal/praefect/router_per_repository.go b/internal/praefect/router_per_repository.go
index c64da8f7a..ca64366cf 100644
--- a/internal/praefect/router_per_repository.go
+++ b/internal/praefect/router_per_repository.go
@@ -176,7 +176,7 @@ func (r *PerRepositoryRouter) RouteRepositoryAccessor(ctx context.Context, virtu
healthyConsistentNodes := make([]RouterNode, 0, len(healthyNodes))
for _, node := range healthyNodes {
- if _, ok := consistentStorages[node.Storage]; !ok {
+ if !consistentStorages.HasValue(node.Storage) {
continue
}
@@ -243,7 +243,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
return RepositoryMutatorRoute{}, fmt.Errorf("consistent storages: %w", err)
}
- if _, ok := consistentStorages[primary]; !ok {
+ if !consistentStorages.HasValue(primary) {
return RepositoryMutatorRoute{}, ErrRepositoryReadOnly
}
@@ -264,7 +264,7 @@ func (r *PerRepositoryRouter) RouteRepositoryMutator(ctx context.Context, virtua
continue
}
- if _, consistent := consistentStorages[node.Storage]; !consistent || !healthy {
+ if !consistentStorages.HasValue(node.Storage) || !healthy {
route.ReplicationTargets = append(route.ReplicationTargets, assigned)
continue
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 93e3f553d..af34fa48f 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v15/internal/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
gconfig "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service"
@@ -548,8 +549,8 @@ func TestRemoveRepository(t *testing.T) {
cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{
WithQueue: queueInterceptor,
WithRepoStore: datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return relativePath, nil, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ return relativePath, datastructure.NewSet[string](), nil
},
},
WithNodeMgr: nodeMgr,
@@ -825,8 +826,8 @@ func TestProxyWrites(t *testing.T) {
})
rs := datastore.MockRepositoryStore{
- GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
- return relativePath, map[string]struct{}{cfg0.Storages[0].Name: {}, cfg1.Storages[0].Name: {}, cfg2.Storages[0].Name: {}}, nil
+ GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, *datastructure.Set[string], error) {
+ return relativePath, datastructure.SetFromValues(cfg0.Storages[0].Name, cfg1.Storages[0].Name, cfg2.Storages[0].Name), nil
},
}