Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-08-23 19:00:21 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-09-15 13:11:11 +0300
commitefdb4e3fea82fce31526ab50656e74d869490076 (patch)
treec65586d6909b7e06a7d794418e0f5f5e9edbd24c
parent6695a2a78dfdf3a4b24a8cee63241901a6f55f59 (diff)
Assign pooled repositories to the same partition as the pool
Pooled repositories need to be in the same partition as the pool itself to order the operations between them for consistency. This commit extends the partitionAssigner to handle this. When assigning a repository into a partition, the repository's objects/info/alternates file is read to see if this repository is linked to a pool. If so, the repository will be assigned into the same partition as the pool. If the pool hasn't yet been assigned into a partition, it will be assigned into a partition first. There's extended validation to ensure the alternate points to a valid repository. The validation is more for sanity checking and the logic would generally work fine even if the path in the alternates pointed to an invalid location. We'd end up creating a partition assignment for a repository that doesn't exist. Not critical but doing the sanity checks likely doesn't hurt and will help us find weird state if we encounter it.
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner.go130
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner_test.go319
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go2
3 files changed, 392 insertions, 59 deletions
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner.go b/internal/gitaly/storage/storagemgr/partition_assigner.go
index ec8ef9cdd..418e70b4b 100644
--- a/internal/gitaly/storage/storagemgr/partition_assigner.go
+++ b/internal/gitaly/storage/storagemgr/partition_assigner.go
@@ -5,15 +5,35 @@ import (
"encoding/binary"
"errors"
"fmt"
+ "io/fs"
+ "path/filepath"
"strconv"
"sync"
"github.com/dgraph-io/badger/v4"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
)
-// errPartitionAssignmentNotFound is returned when attempting to access a
-// partition assignment in the database that doesn't yet exist.
-var errPartitionAssignmentNotFound = errors.New("partition assignment not found")
+var (
+ // errPartitionAssignmentNotFound is returned when attempting to access a
+ // partition assignment in the database that doesn't yet exist.
+ errPartitionAssignmentNotFound = errors.New("partition assignment not found")
+ // errNoAlternate is used internally by partitionAssigner to signal a repository
+ // has no alternates.
+ errNoAlternate = errors.New("repository has no alternate")
+ // errMultipleAlternates is returned when a repository has multiple alternates
+ // configured.
+ errMultipleAlternates = errors.New("repository has multiple alternates")
+ // errAlternatePointsToSelf is returned when a repository's alternate points to the
+ // repository itself.
+ errAlternatePointsToSelf = errors.New("repository's alternate points to self")
+ // errAlternateHasAlternate is returned when a repository's alternate itself has an
+ // alternate listed.
+ errAlternateHasAlternate = errors.New("repository's alternate has an alternate itself")
+)
+
+const prefixPartitionAssignment = "partition_assignment/"
// partitionID uniquely identifies a partition.
type partitionID uint64
@@ -40,7 +60,7 @@ func newPartitionAssignmentTable(db *badger.DB) *partitionAssignmentTable {
}
func (pt *partitionAssignmentTable) key(relativePath string) []byte {
- return []byte(fmt.Sprintf("partition_assignment/%s", relativePath))
+ return []byte(fmt.Sprintf("%s%s", prefixPartitionAssignment, relativePath))
}
func (pt *partitionAssignmentTable) getPartitionID(relativePath string) (partitionID, error) {
@@ -90,11 +110,14 @@ type partitionAssigner struct {
idSequence *badger.Sequence
// partitionAssignmentTable contains the partition assignment records.
partitionAssignmentTable *partitionAssignmentTable
+ // storagePath is the path to the root directory of the storage the relative
+ // paths are computed against.
+ storagePath string
}
// newPartitionAssigner returns a new partitionAssigner. Close must be called on the
// returned instance to release acquired resources.
-func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) {
+func newPartitionAssigner(db *badger.DB, storagePath string) (*partitionAssigner, error) {
seq, err := db.GetSequence([]byte("partition_id_seq"), 100)
if err != nil {
return nil, fmt.Errorf("get sequence: %w", err)
@@ -104,6 +127,7 @@ func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) {
repositoryLocks: make(map[string]chan struct{}),
idSequence: seq,
partitionAssignmentTable: newPartitionAssignmentTable(db),
+ storagePath: storagePath,
}, nil
}
@@ -126,10 +150,16 @@ func (pa *partitionAssigner) allocatePartitionID() (partitionID, error) {
return partitionID(id), nil
}
-// getPartititionID returns the partition ID of the repository. If the repository wasn't yet assigned into
+// getPartitionID returns the partition ID of the repository. If the repository wasn't yet assigned into
// a partition, it will be assigned into one and the assignment stored. Further accesses return the stored
-// partition ID. Each repository goes into its own partition. The method is safe to call concurrently.
+// partition ID. Repositories without an alternate go into their own partitions. Repositories with an alternate
+// are assigned into the same partition as the alternate repository. The alternate is assigned into a partition
+// if it hasn't yet been. The method is safe to call concurrently.
func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath string) (partitionID, error) {
+ return pa.getPartitionIDRecursive(ctx, relativePath, false)
+}
+
+func (pa *partitionAssigner) getPartitionIDRecursive(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath)
if err != nil {
if !errors.Is(err, errPartitionAssignmentNotFound) {
@@ -183,16 +213,94 @@ func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath st
return ptnID, nil
}
- // Each repository goes into its own partition. Allocate a new partition ID for this
- // repository.
+ ptnID, err = pa.assignPartitionID(ctx, relativePath, recursiveCall)
+ if err != nil {
+ return 0, fmt.Errorf("assign partition ID: %w", err)
+ }
+ }
+
+ return ptnID, nil
+}
+
+func (pa *partitionAssigner) assignPartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
+ // Check if the repository has an alternate. If so, it needs to go into the same
+ // partition with it.
+ ptnID, err := pa.getAlternatePartitionID(ctx, relativePath, recursiveCall)
+ if err != nil {
+ if !errors.Is(err, errNoAlternate) {
+ return 0, fmt.Errorf("get alternate partition ID: %w", err)
+ }
+
+ // The repository has no alternate. Unpooled repositories go into their own partitions.
+ // Allocate a new partition ID for this repository.
ptnID, err = pa.allocatePartitionID()
if err != nil {
return 0, fmt.Errorf("acquire partition id: %w", err)
}
+ }
+
+ if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil {
+ return 0, fmt.Errorf("set partition: %w", err)
+ }
- if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil {
- return 0, fmt.Errorf("set partition: %w", err)
+ return ptnID, nil
+}
+
+func (pa *partitionAssigner) getAlternatePartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
+ alternates, err := stats.ReadAlternatesFile(filepath.Join(pa.storagePath, relativePath))
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return 0, errNoAlternate
}
+
+ return 0, fmt.Errorf("read alternates file: %w", err)
+ }
+
+ if len(alternates) == 0 {
+ return 0, errNoAlternate
+ } else if recursiveCall {
+ // recursive being true indicates we've arrived here through another repository's alternate.
+ // Repositories in Gitaly should only have a single alternate that points to the repository's
+ // pool. Chains of alternates are unexpected and could go arbitrarily long, so fail the operation.
+ return 0, errAlternateHasAlternate
+ } else if len(alternates) > 1 {
+ // Repositories shouldn't have more than one alternate given they should only be
+ // linked to a single pool at most.
+ return 0, errMultipleAlternates
+ }
+
+ // The relative path should point somewhere within the same storage.
+ alternateRelativePath, err := storage.ValidateRelativePath(
+ pa.storagePath,
+ // Take the relative path to the repository, not 'repository/objects'.
+ filepath.Dir(
+ // The path in alternates file points to the object directory of the alternate
+ // repository. The path is relative to the repository's own object directory.
+ filepath.Join(relativePath, "objects", alternates[0]),
+ ),
+ )
+ if err != nil {
+ return 0, fmt.Errorf("validate relative path: %w", err)
+ }
+
+ if alternateRelativePath == relativePath {
+ // The alternate must not point to the repository itself. Not only is it non-sensical
+ // but it would also cause a dead lock as the repository is locked during this call
+ // already.
+ return 0, errAlternatePointsToSelf
+ }
+
+ // The relative path should point to a Git directory.
+ if err := storage.ValidateGitDirectory(filepath.Join(pa.storagePath, alternateRelativePath)); err != nil {
+ return 0, fmt.Errorf("validate git directory: %w", err)
+ }
+
+ // Recursively get the alternate's partition ID or assign it one. This time
+ // we set recursive to true to fail the operation if the alternate itself has an
+ // alternate configured.
+ ptnID, err := pa.getPartitionIDRecursive(ctx, alternateRelativePath, true)
+ if err != nil {
+ return 0, fmt.Errorf("get partition ID: %w", err)
}
return ptnID, nil
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner_test.go b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
index fb8a05b1e..9ccf67d98 100644
--- a/internal/gitaly/storage/storagemgr/partition_assigner_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
@@ -1,21 +1,59 @@
package storagemgr
import (
- "fmt"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "strings"
"sync"
"testing"
+ "github.com/dgraph-io/badger/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
)
+type partitionAssignments map[string]partitionID
+
+func getPartitionAssignments(tb testing.TB, db *badger.DB) partitionAssignments {
+ tb.Helper()
+
+ state := partitionAssignments{}
+ require.NoError(tb, db.View(func(txn *badger.Txn) error {
+ it := txn.NewIterator(badger.IteratorOptions{
+ Prefix: []byte(prefixPartitionAssignment),
+ })
+ defer it.Close()
+
+ for it.Rewind(); it.Valid(); it.Next() {
+ value, err := it.Item().ValueCopy(nil)
+ require.NoError(tb, err)
+
+ var ptnID partitionID
+ ptnID.UnmarshalBinary(value)
+
+ relativePath := strings.TrimPrefix(string(it.Item().Key()), prefixPartitionAssignment)
+ state[relativePath] = ptnID
+ }
+
+ return nil
+ }))
+
+ return state
+}
+
func TestPartitionAssigner(t *testing.T) {
db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
require.NoError(t, err)
defer testhelper.MustClose(t, db)
- pa, err := newPartitionAssigner(db)
+ cfg := testcfg.Build(t)
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
require.NoError(t, err)
defer testhelper.MustClose(t, pa)
@@ -39,13 +77,142 @@ func TestPartitionAssigner(t *testing.T) {
require.EqualValues(t, 1, ptnID1)
}
+func TestPartitionAssigner_alternates(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ memberAlternatesContent []byte
+ poolAlternatesContent []byte
+ expectedError error
+ expectedPartitionAssignments partitionAssignments
+ }{
+ {
+ desc: "no alternates file",
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 2,
+ },
+ },
+ {
+ desc: "empty alternates file",
+ memberAlternatesContent: []byte(""),
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 2,
+ },
+ },
+ {
+ desc: "not a git directory",
+ memberAlternatesContent: []byte("../.."),
+ expectedError: storage.InvalidGitDirectoryError{MissingEntry: "objects"},
+ },
+ {
+ desc: "points to pool",
+ memberAlternatesContent: []byte("../../pool/objects"),
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 1,
+ },
+ },
+ {
+ desc: "points to pool with newline",
+ memberAlternatesContent: []byte("../../pool/objects\n"),
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 1,
+ },
+ },
+ {
+ desc: "multiple alternates fail",
+ memberAlternatesContent: []byte("../../pool/objects\nother-alternate"),
+ expectedError: errMultipleAlternates,
+ },
+ {
+ desc: "alternate pointing to self fails",
+ memberAlternatesContent: []byte("../objects"),
+ expectedError: errAlternatePointsToSelf,
+ },
+ {
+ desc: "alternate having an alternate fails",
+ poolAlternatesContent: []byte("unexpected"),
+ memberAlternatesContent: []byte("../../pool/objects"),
+ expectedError: errAlternateHasAlternate,
+ },
+ {
+ desc: "alternate points outside the storage",
+ memberAlternatesContent: []byte("../../../../.."),
+ expectedError: storage.ErrRelativePathEscapesRoot,
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ cfg := testcfg.Build(t)
+
+ ctx := testhelper.Context(t)
+ poolRepo, poolPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ RelativePath: "pool",
+ })
+
+ memberRepo, memberPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ RelativePath: "member",
+ })
+
+ writeAlternatesFile := func(t *testing.T, repoPath string, content []byte) {
+ t.Helper()
+ require.NoError(t, os.WriteFile(stats.AlternatesFilePath(repoPath), content, os.ModePerm))
+ }
+
+ if tc.poolAlternatesContent != nil {
+ writeAlternatesFile(t, poolPath, tc.poolAlternatesContent)
+ }
+
+ if tc.memberAlternatesContent != nil {
+ writeAlternatesFile(t, memberPath, tc.memberAlternatesContent)
+ }
+
+ db, err := OpenDatabase(testhelper.NewLogger(t), t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, db)
+
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pa)
+
+ expectedPartitionAssignments := tc.expectedPartitionAssignments
+ if expectedPartitionAssignments == nil {
+ expectedPartitionAssignments = partitionAssignments{}
+ }
+
+ if memberPartitionID, err := pa.getPartitionID(ctx, memberRepo.RelativePath); tc.expectedError != nil {
+ require.ErrorIs(t, err, tc.expectedError)
+ } else {
+ require.NoError(t, err)
+ require.Equal(t, expectedPartitionAssignments["member"], memberPartitionID)
+
+ poolPartitionID, err := pa.getPartitionID(ctx, poolRepo.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, expectedPartitionAssignments["pool"], poolPartitionID)
+ }
+
+ require.Equal(t, expectedPartitionAssignments, getPartitionAssignments(t, db))
+ })
+ }
+}
+
func TestPartitionAssigner_close(t *testing.T) {
dbDir := t.TempDir()
db, err := OpenDatabase(testhelper.SharedLogger(t), dbDir)
require.NoError(t, err)
- pa, err := newPartitionAssigner(db)
+ cfg := testcfg.Build(t)
+
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
require.NoError(t, err)
testhelper.MustClose(t, pa)
testhelper.MustClose(t, db)
@@ -54,7 +221,7 @@ func TestPartitionAssigner_close(t *testing.T) {
require.NoError(t, err)
defer testhelper.MustClose(t, db)
- pa, err = newPartitionAssigner(db)
+ pa, err = newPartitionAssigner(db, cfg.Storages[0].Path)
require.NoError(t, err)
defer testhelper.MustClose(t, pa)
@@ -67,53 +234,111 @@ func TestPartitionAssigner_close(t *testing.T) {
}
func TestPartitionAssigner_concurrentAccess(t *testing.T) {
- db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
- require.NoError(t, err)
- defer testhelper.MustClose(t, db)
+ t.Parallel()
- pa, err := newPartitionAssigner(db)
- require.NoError(t, err)
- defer testhelper.MustClose(t, pa)
+ for _, tc := range []struct {
+ desc string
+ withAlternate bool
+ }{
+ {
+ desc: "without alternate",
+ },
+ {
+ desc: "with alternate",
+ withAlternate: true,
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
- // Access 10 repositories concurrently.
- repositoryCount := 10
- // Access each repository from 10 goroutines concurrently.
- goroutineCount := 10
+ db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, db)
- collectedIDs := make([][]partitionID, repositoryCount)
- ctx := testhelper.Context(t)
- wg := sync.WaitGroup{}
- start := make(chan struct{})
- for i := 0; i < repositoryCount; i++ {
- i := i
- collectedIDs[i] = make([]partitionID, goroutineCount)
- relativePath := fmt.Sprintf("relative-path-%d", i)
- for j := 0; j < goroutineCount; j++ {
- j := j
- wg.Add(1)
- go func() {
- defer wg.Done()
- <-start
- ptnID, err := pa.getPartitionID(ctx, relativePath)
- assert.NoError(t, err)
- collectedIDs[i][j] = ptnID
- }()
- }
- }
+ cfg := testcfg.Build(t)
- close(start)
- wg.Wait()
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pa)
- var partitionIDs []partitionID
- for _, ids := range collectedIDs {
- partitionIDs = append(partitionIDs, ids[0])
- for i := range ids {
- // We expect all goroutines accessing a given repository to get the
- // same partition ID for it.
- require.Equal(t, ids[0], ids[i], ids)
- }
- }
+ // Access 10 repositories concurrently.
+ repositoryCount := 10
+ // Access each repository from 10 goroutines concurrently.
+ goroutineCount := 10
+
+ collectedIDs := make([][]partitionID, repositoryCount)
+ ctx := testhelper.Context(t)
+ wg := sync.WaitGroup{}
+ start := make(chan struct{})
+
+ pool, poolPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ for i := 0; i < repositoryCount; i++ {
+ i := i
+ collectedIDs[i] = make([]partitionID, goroutineCount)
+
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ if tc.withAlternate {
+ // Link the repositories to the pool.
+ alternateRelativePath, err := filepath.Rel(
+ filepath.Join(repoPath, "objects"),
+ filepath.Join(poolPath, "objects"),
+ )
+ require.NoError(t, err)
+ require.NoError(t, os.WriteFile(filepath.Join(repoPath, "objects", "info", "alternates"), []byte(alternateRelativePath), fs.ModePerm))
- // We expect to have 10 unique partition IDs as there are 10 repositories being accessed.
- require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-start
+ _, err := pa.getPartitionID(ctx, repo.RelativePath)
+ assert.NoError(t, err)
+ }()
+ }
+
+ for j := 0; j < goroutineCount; j++ {
+ j := j
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-start
+ ptnID, err := pa.getPartitionID(ctx, repo.RelativePath)
+ assert.NoError(t, err)
+ collectedIDs[i][j] = ptnID
+ }()
+ }
+ }
+
+ close(start)
+ wg.Wait()
+
+ var partitionIDs []partitionID
+ for _, ids := range collectedIDs {
+ partitionIDs = append(partitionIDs, ids[0])
+ for i := range ids {
+ // We expect all goroutines accessing a given repository to get the
+ // same partition ID for it.
+ require.Equal(t, ids[0], ids[i], ids)
+ }
+ }
+
+ if tc.withAlternate {
+ // We expect all repositories to have been assigned to the same partition as they are all linked to the same pool.
+ require.Equal(t, []partitionID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, partitionIDs)
+ ptnID, err := pa.getPartitionID(ctx, pool.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, partitionID(1), ptnID, "pool should have been assigned into the same partition as the linked repositories")
+ return
+ }
+
+ // We expect to have 10 unique partition IDs as there are 10 repositories being accessed.
+ require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs)
+ })
+ }
}
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index e30e5cc23..9ff3e2f72 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -215,7 +215,7 @@ func NewPartitionManager(
return nil, fmt.Errorf("create storage's database directory: %w", err)
}
- pa, err := newPartitionAssigner(db)
+ pa, err := newPartitionAssigner(db, storage.Path)
if err != nil {
return nil, fmt.Errorf("new partition assigner: %w", err)
}