Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-10-31 19:47:13 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-12-04 19:26:53 +0300
commit84c9bcdc3636933681ae636ee04c97a4f5759cb9 (patch)
tree50084a30ca1d83fc3a69f6d12981ef4da166dfd9
parentc9a34ebe3c71793b984b9e3af2c7eb3b6a317cc3 (diff)
Support partitioning hints
ObjectPool related RPCs often carry in the request the target repository and the alterante repository. Depending on the RPC the roles switch but these refer to the object pool and the member repository. Gitaly is currently assigning relative paths into partitions on first access. As it is, each repository goes into its own partition unless it has an alternate. If there is an alternate, the repository is assigned to the same partition. This ensures the pool and the member are assigned into the same partition so their operations are serialized. We currently don't support assigning a repository explicitly into the same partition as another repository. This will be needed in order to ensure that a repository that is being linked to a pool will be assigned to the same partition as the pool even before the link is made. Similarly, this can be used to ensure the repository created with CreateFork will be assigned to the same partition as the pool.
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner.go74
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner_test.go50
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go2
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager_test.go2
4 files changed, 99 insertions, 29 deletions
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner.go b/internal/gitaly/storage/storagemgr/partition_assigner.go
index fa386c9a0..492694e4d 100644
--- a/internal/gitaly/storage/storagemgr/partition_assigner.go
+++ b/internal/gitaly/storage/storagemgr/partition_assigner.go
@@ -30,6 +30,9 @@ var (
// errAlternateHasAlternate is returned when a repository's alternate itself has an
// alternate listed.
errAlternateHasAlternate = errors.New("repository's alternate has an alternate itself")
+ // errRepositoriesAreInDifferentPartitions is returned when attempting to begin a transaction spanning
+ // repositories that are in different partitions.
+ errRepositoriesAreInDifferentPartitions = errors.New("repositories are in different partitions")
)
const prefixPartitionAssignment = "partition_assignment/"
@@ -154,11 +157,42 @@ func (pa *partitionAssigner) allocatePartitionID() (partitionID, error) {
// partition ID. Repositories without an alternate go into their own partitions. Repositories with an alternate
// are assigned into the same partition as the alternate repository. The alternate is assigned into a partition
// if it hasn't yet been. The method is safe to call concurrently.
-func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath string) (partitionID, error) {
- return pa.getPartitionIDRecursive(ctx, relativePath, false)
+func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath, partitionWithRelativePath string) (partitionID, error) {
+ var partitionHint partitionID
+ if partitionWithRelativePath != "" {
+ var err error
+ // See if the target repository itself is already in a partition. If so, we should assign the other repository
+ // in the same partition if it is not yet partitioned.
+ if partitionHint, err = pa.partitionAssignmentTable.getPartitionID(relativePath); err != nil {
+ if !errors.Is(err, errPartitionAssignmentNotFound) {
+ return 0, fmt.Errorf("get possible partition id: %w", err)
+ }
+
+ // There was no assignment.
+ partitionHint = 0
+ }
+
+ // Get or assign the alternate into a partition. If the target repository was already assigned into a partition,
+ // assign the alternate in the same partition.
+ if partitionHint, err = pa.getPartitionIDRecursive(ctx, partitionWithRelativePath, false, partitionHint); err != nil {
+ return 0, fmt.Errorf("get additional relative path's partition ID: %w", err)
+ }
+ }
+
+ // Get the repository's partition, or assign if it yet wasn't assigned, assign it with the alternate.
+ ptnID, err := pa.getPartitionIDRecursive(ctx, relativePath, false, partitionHint)
+ if err != nil {
+ return 0, fmt.Errorf("get partition ID: %w", err)
+ }
+
+ if partitionHint != 0 && ptnID != partitionHint {
+ return 0, errRepositoriesAreInDifferentPartitions
+ }
+
+ return ptnID, nil
}
-func (pa *partitionAssigner) getPartitionIDRecursive(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
+func (pa *partitionAssigner) getPartitionIDRecursive(ctx context.Context, relativePath string, recursiveCall bool, partitionHint partitionID) (partitionID, error) {
ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath)
if err != nil {
if !errors.Is(err, errPartitionAssignmentNotFound) {
@@ -212,7 +246,13 @@ func (pa *partitionAssigner) getPartitionIDRecursive(ctx context.Context, relati
return ptnID, nil
}
- ptnID, err = pa.assignPartitionID(ctx, relativePath, recursiveCall)
+ // With the repository under lock, verify it is a Git directory before we assign it into a partition.
+ // It's okay if the repository doesn't yet exist as this transaction may be about to create it.
+ if err := storage.ValidateGitDirectory(filepath.Join(pa.storagePath, relativePath)); err != nil && !errors.Is(err, fs.ErrNotExist) {
+ return 0, fmt.Errorf("validate git directory: %w", err)
+ }
+
+ ptnID, err = pa.assignPartitionID(ctx, relativePath, recursiveCall, partitionHint)
if err != nil {
return 0, fmt.Errorf("assign partition ID: %w", err)
}
@@ -221,20 +261,23 @@ func (pa *partitionAssigner) getPartitionIDRecursive(ctx context.Context, relati
return ptnID, nil
}
-func (pa *partitionAssigner) assignPartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
+func (pa *partitionAssigner) assignPartitionID(ctx context.Context, relativePath string, recursiveCall bool, partitionHint partitionID) (partitionID, error) {
// Check if the repository has an alternate. If so, it needs to go into the same
// partition with it.
- ptnID, err := pa.getAlternatePartitionID(ctx, relativePath, recursiveCall)
+ ptnID, err := pa.getAlternatePartitionID(ctx, relativePath, recursiveCall, partitionHint)
if err != nil {
if !errors.Is(err, errNoAlternate) {
return 0, fmt.Errorf("get alternate partition ID: %w", err)
}
- // The repository has no alternate. Unpooled repositories go into their own partitions.
- // Allocate a new partition ID for this repository.
- ptnID, err = pa.allocatePartitionID()
- if err != nil {
- return 0, fmt.Errorf("acquire partition id: %w", err)
+ ptnID = partitionHint
+ if ptnID == 0 {
+ // The repository has no alternate. Unpooled repositories go into their own partitions.
+ // Allocate a new partition ID for this repository.
+ ptnID, err = pa.allocatePartitionID()
+ if err != nil {
+ return 0, fmt.Errorf("acquire partition id: %w", err)
+ }
}
}
@@ -245,7 +288,7 @@ func (pa *partitionAssigner) assignPartitionID(ctx context.Context, relativePath
return ptnID, nil
}
-func (pa *partitionAssigner) getAlternatePartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
+func (pa *partitionAssigner) getAlternatePartitionID(ctx context.Context, relativePath string, recursiveCall bool, partitionHint partitionID) (partitionID, error) {
alternate, err := readAlternatesFile(filepath.Join(pa.storagePath, relativePath))
if err != nil {
return 0, fmt.Errorf("read alternates file: %w", err)
@@ -279,15 +322,10 @@ func (pa *partitionAssigner) getAlternatePartitionID(ctx context.Context, relati
return 0, errAlternatePointsToSelf
}
- // The relative path should point to a Git directory.
- if err := storage.ValidateGitDirectory(filepath.Join(pa.storagePath, alternateRelativePath)); err != nil {
- return 0, fmt.Errorf("validate git directory: %w", err)
- }
-
// Recursively get the alternate's partition ID or assign it one. This time
// we set recursive to true to fail the operation if the alternate itself has an
// alternate configured.
- ptnID, err := pa.getPartitionIDRecursive(ctx, alternateRelativePath, true)
+ ptnID, err := pa.getPartitionIDRecursive(ctx, alternateRelativePath, true, partitionHint)
if err != nil {
return 0, fmt.Errorf("get partition ID: %w", err)
}
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner_test.go b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
index b9239b4f0..0bd055a99 100644
--- a/internal/gitaly/storage/storagemgr/partition_assigner_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
@@ -61,20 +61,52 @@ func TestPartitionAssigner(t *testing.T) {
relativePath1 := "relative-path-1"
// The relative path should get assigned into partition.
- ptnID1, err := pa.getPartitionID(ctx, relativePath1)
+ ptnID1, err := pa.getPartitionID(ctx, relativePath1, "")
require.NoError(t, err)
require.EqualValues(t, 1, ptnID1, "first allocated partition id should be 1")
// The second repository should land into its own partition.
- ptnID2, err := pa.getPartitionID(ctx, "relative-path-2")
+ ptnID2, err := pa.getPartitionID(ctx, "relative-path-2", "")
require.NoError(t, err)
require.EqualValues(t, 2, ptnID2)
// Retrieving the first repository's partition should return the partition that
// was assigned earlier.
- ptnID1, err = pa.getPartitionID(ctx, relativePath1)
+ ptnID1, err = pa.getPartitionID(ctx, relativePath1, "")
require.NoError(t, err)
require.EqualValues(t, 1, ptnID1)
+
+ // Repository should get assigned into the same partition as the alternate.
+ ptnID3, err := pa.getPartitionID(ctx, "relative-path-3", relativePath1)
+ require.NoError(t, err)
+ require.Equal(t, ptnID3, ptnID1)
+
+ // The alternate should get assigned into the same partition as the target repository
+ // if it already has a partition.
+ ptnID4, err := pa.getPartitionID(ctx, relativePath1, "relative-path-4")
+ require.NoError(t, err)
+ require.Equal(t, ptnID4, ptnID1)
+
+ // If neither repository are yet assigned, they should both get assigned into the same
+ // partition.
+ const relativePath6 = "relative-path-6"
+ ptnID5, err := pa.getPartitionID(ctx, "relative-path-5", relativePath6)
+ require.NoError(t, err)
+ require.EqualValues(t, 3, ptnID5)
+
+ // Getting a partition fails if the repositories are in different partitions.
+ ptnID6, err := pa.getPartitionID(ctx, relativePath1, relativePath6)
+ require.Equal(t, errRepositoriesAreInDifferentPartitions, err)
+ require.EqualValues(t, 0, ptnID6)
+
+ require.Equal(t, partitionAssignments{
+ relativePath1: 1,
+ "relative-path-2": 2,
+ "relative-path-3": 1,
+ "relative-path-4": 1,
+ "relative-path-5": 3,
+ relativePath6: 3,
+ }, getPartitionAssignments(t, db))
}
func TestPartitionAssigner_alternates(t *testing.T) {
@@ -188,13 +220,13 @@ func TestPartitionAssigner_alternates(t *testing.T) {
expectedPartitionAssignments = partitionAssignments{}
}
- if memberPartitionID, err := pa.getPartitionID(ctx, memberRepo.RelativePath); tc.expectedError != nil {
+ if memberPartitionID, err := pa.getPartitionID(ctx, memberRepo.RelativePath, ""); tc.expectedError != nil {
require.ErrorIs(t, err, tc.expectedError)
} else {
require.NoError(t, err)
require.Equal(t, expectedPartitionAssignments["member"], memberPartitionID)
- poolPartitionID, err := pa.getPartitionID(ctx, poolRepo.RelativePath)
+ poolPartitionID, err := pa.getPartitionID(ctx, poolRepo.RelativePath, "")
require.NoError(t, err)
require.Equal(t, expectedPartitionAssignments["pool"], poolPartitionID)
}
@@ -228,7 +260,7 @@ func TestPartitionAssigner_close(t *testing.T) {
// A block of ID is loaded into memory when the partitionAssigner is initialized.
// Closing the partitionAssigner is expected to return the unused IDs in the block
// back to the database.
- ptnID, err := pa.getPartitionID(testhelper.Context(t), "relative-path")
+ ptnID, err := pa.getPartitionID(testhelper.Context(t), "relative-path", "")
require.NoError(t, err)
require.EqualValues(t, 1, ptnID)
}
@@ -297,7 +329,7 @@ func TestPartitionAssigner_concurrentAccess(t *testing.T) {
go func() {
defer wg.Done()
<-start
- _, err := pa.getPartitionID(ctx, repo.RelativePath)
+ _, err := pa.getPartitionID(ctx, repo.RelativePath, "")
assert.NoError(t, err)
}()
}
@@ -308,7 +340,7 @@ func TestPartitionAssigner_concurrentAccess(t *testing.T) {
go func() {
defer wg.Done()
<-start
- ptnID, err := pa.getPartitionID(ctx, repo.RelativePath)
+ ptnID, err := pa.getPartitionID(ctx, repo.RelativePath, "")
assert.NoError(t, err)
collectedIDs[i][j] = ptnID
}()
@@ -331,7 +363,7 @@ func TestPartitionAssigner_concurrentAccess(t *testing.T) {
if tc.withAlternate {
// We expect all repositories to have been assigned to the same partition as they are all linked to the same pool.
require.Equal(t, []partitionID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, partitionIDs)
- ptnID, err := pa.getPartitionID(ctx, pool.RelativePath)
+ ptnID, err := pa.getPartitionID(ctx, pool.RelativePath, "")
require.NoError(t, err)
require.Equal(t, partitionID(1), ptnID, "pool should have been assigned into the same partition as the linked repositories")
return
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index 899ad6a97..7a46d44ea 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -375,7 +375,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, storageName, relativePath
return nil, structerr.NewInvalidArgument("validate relative path: %w", err)
}
- partitionID, err := storageMgr.partitionAssigner.getPartitionID(ctx, relativePath)
+ partitionID, err := storageMgr.partitionAssigner.getPartitionID(ctx, relativePath, "")
if err != nil {
if errors.Is(err, badger.ErrDBClosed) {
// The database is closed when PartitionManager is closing. Return a more
diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go
index 00ba970a9..4b951d6b8 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go
@@ -734,7 +734,7 @@ func TestPartitionManager(t *testing.T) {
storageMgr := partitionManager.storages[step.repo.GetStorageName()]
storageMgr.mu.Lock()
- ptnID, err := storageMgr.partitionAssigner.getPartitionID(ctx, step.repo.GetRelativePath())
+ ptnID, err := storageMgr.partitionAssigner.getPartitionID(ctx, step.repo.GetRelativePath(), "")
require.NoError(t, err)
ptn := storageMgr.partitions[ptnID]