diff options
author | Justin Tobler <jtobler@gitlab.com> | 2023-09-15 21:28:43 +0300 |
---|---|---|
committer | Justin Tobler <jtobler@gitlab.com> | 2023-09-15 21:28:43 +0300 |
commit | 971fb9142d495e10b571d4a16d75c6baba0c615e (patch) | |
tree | 82b4b38e659f355e5d2dc16b627184afd1c993e1 | |
parent | 3958129f4b15a62ed6a11b8dbb7f8ba5134963db (diff) | |
parent | efdb4e3fea82fce31526ab50656e74d869490076 (diff) |
Merge branch 'smh-object-pool-partitioning' into 'master'
Assign pooled repositories to the same partition as the pool
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6350
Merged-by: Justin Tobler <jtobler@gitlab.com>
Approved-by: Justin Tobler <jtobler@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Justin Tobler <jtobler@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r-- | internal/git/stats/repository_info.go | 53 | ||||
-rw-r--r-- | internal/git/stats/repository_info_test.go | 69 | ||||
-rw-r--r-- | internal/gitaly/config/locator.go | 19 | ||||
-rw-r--r-- | internal/gitaly/storage/locator.go | 36 | ||||
-rw-r--r-- | internal/gitaly/storage/locator_test.go | 51 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/partition_assigner.go | 130 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/partition_assigner_test.go | 319 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/partition_manager.go | 2 |
8 files changed, 591 insertions, 88 deletions
diff --git a/internal/git/stats/repository_info.go b/internal/git/stats/repository_info.go index eede939e9..159b31b00 100644 --- a/internal/git/stats/repository_info.go +++ b/internal/git/stats/repository_info.go @@ -522,26 +522,21 @@ func (a AlternatesInfo) AbsoluteObjectDirectories() []string { return alternatePaths } -// AlternatesInfoForRepository reads the alternates file and returns information on it. This -// function does not return an error in case the alternates file doesn't exist. Existence can be -// checked via the `Exists` field of the returned `AlternatesInfo` structure. -func AlternatesInfoForRepository(repoPath string) (AlternatesInfo, error) { - file, err := os.Open(filepath.Join(repoPath, "objects", "info", "alternates")) - if err != nil { - if errors.Is(err, os.ErrNotExist) { - return AlternatesInfo{ - Exists: false, - }, nil - } - - return AlternatesInfo{}, err - } - defer file.Close() +// AlternatesFilePath returns the 'objects/info/alternates' +// file's path in the repository. +func AlternatesFilePath(repoPath string) string { + return filepath.Join(repoPath, "objects", "info", "alternates") +} - stat, err := file.Stat() +// ReadAlternatesFile returns the repository's alternate object directory paths +// from '<repo>/objects/infop/alternates' and returns them. Returns a wrapped +// fs.ErrNotExist if the file doesn't exist. +func ReadAlternatesFile(repoPath string) ([]string, error) { + file, err := os.Open(AlternatesFilePath(repoPath)) if err != nil { - return AlternatesInfo{}, err + return nil, fmt.Errorf("open: %w", err) } + defer file.Close() var alternatePaths []string scanner := bufio.NewScanner(file) @@ -559,8 +554,30 @@ func AlternatesInfoForRepository(repoPath string) (AlternatesInfo, error) { alternatePaths = append(alternatePaths, scanner.Text()) } } + if err := scanner.Err(); err != nil { - return AlternatesInfo{}, fmt.Errorf("scanning alternate paths: %w", err) + return nil, fmt.Errorf("scanning alternate paths: %w", err) + } + + return alternatePaths, nil +} + +// AlternatesInfoForRepository reads the alternates file and returns information on it. This +// function does not return an error in case the alternates file doesn't exist. Existence can be +// checked via the `Exists` field of the returned `AlternatesInfo` structure. +func AlternatesInfoForRepository(repoPath string) (AlternatesInfo, error) { + alternatePaths, err := ReadAlternatesFile(repoPath) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return AlternatesInfo{Exists: false}, nil + } + + return AlternatesInfo{}, fmt.Errorf("read alternates file: %w", err) + } + + stat, err := os.Stat(AlternatesFilePath(repoPath)) + if err != nil { + return AlternatesInfo{}, fmt.Errorf("stat: %w", err) } return AlternatesInfo{ diff --git a/internal/git/stats/repository_info_test.go b/internal/git/stats/repository_info_test.go index a904ae4de..81285cdc1 100644 --- a/internal/git/stats/repository_info_test.go +++ b/internal/git/stats/repository_info_test.go @@ -752,6 +752,75 @@ func TestAlternatesInfoForRepository(t *testing.T) { } } +func TestReadAlternatesFile(t *testing.T) { + for _, tc := range []struct { + desc string + alternatesContent []byte + expectedAlternates []string + expectedError error + }{ + { + desc: "no alternates file", + expectedError: fs.ErrNotExist, + }, + { + desc: "empty alternates", + alternatesContent: []byte(""), + }, + { + desc: "empty lines", + alternatesContent: []byte("\n\n"), + }, + { + desc: "path between empty lines", + alternatesContent: []byte("\n/path/1\n"), + expectedAlternates: []string{"/path/1"}, + }, + { + desc: "path without newline", + alternatesContent: []byte("../some/path"), + expectedAlternates: []string{"../some/path"}, + }, + { + desc: "path with newline", + alternatesContent: []byte("../some/path\n"), + expectedAlternates: []string{"../some/path"}, + }, + { + desc: "multiple different paths", + alternatesContent: []byte("path/1\n/path/2\npath/3"), + expectedAlternates: []string{"path/1", "/path/2", "path/3"}, + }, + { + desc: "same path multiple times", + alternatesContent: []byte("path/1\npath/1\n"), + expectedAlternates: []string{"path/1", "path/1"}, + }, + { + desc: "commented line ignored", + alternatesContent: []byte("path/1\n#THIS LINE IS IGNORED\npath/2\n"), + expectedAlternates: []string{"path/1", "path/2"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + _, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + if tc.alternatesContent != nil { + require.NoError(t, os.WriteFile(AlternatesFilePath(repoPath), tc.alternatesContent, fs.ModePerm)) + } + + alternates, err := ReadAlternatesFile(repoPath) + require.ErrorIs(t, err, tc.expectedError) + require.Equal(t, tc.expectedAlternates, alternates) + }) + } +} + func TestReferencesInfoForRepository(t *testing.T) { t.Parallel() diff --git a/internal/gitaly/config/locator.go b/internal/gitaly/config/locator.go index f077d039f..ec989f29f 100644 --- a/internal/gitaly/config/locator.go +++ b/internal/gitaly/config/locator.go @@ -84,22 +84,19 @@ func (l *configLocator) ValidateRepository(repo storage.Repository, opts ...stor } if !cfg.SkipRepositoryExistenceCheck { - if _, err := os.Stat(path); err != nil { + if err := storage.ValidateGitDirectory(path); err != nil { if errors.Is(err, os.ErrNotExist) { return storage.NewRepositoryNotFoundError(repo.GetStorageName(), repo.GetRelativePath()) } - return structerr.New("statting repository: %w", err).WithMetadata("repository_path", path) - } - - for _, element := range []string{"objects", "refs", "HEAD"} { - if _, err := os.Stat(filepath.Join(path, element)); err != nil { - if errors.Is(err, os.ErrNotExist) { - return structerr.NewFailedPrecondition("%w: %q does not exist", storage.ErrRepositoryNotValid, element).WithMetadata("repository_path", path) - } - - return structerr.New("statting %q: %w", element, err).WithMetadata("repository_path", path) + var errInvalidGitDir storage.InvalidGitDirectoryError + if errors.As(err, &errInvalidGitDir) { + return structerr.NewFailedPrecondition( + "%w: %q does not exist", storage.ErrRepositoryNotValid, errInvalidGitDir.MissingEntry, + ).WithMetadata("repository_path", path) } + + return structerr.New("validate git directory: %w", err).WithMetadata("repository_path", path) } // See: https://gitlab.com/gitlab-org/gitaly/issues/1339 diff --git a/internal/gitaly/storage/locator.go b/internal/gitaly/storage/locator.go index 5a116de6a..fe5ea23b1 100644 --- a/internal/gitaly/storage/locator.go +++ b/internal/gitaly/storage/locator.go @@ -4,6 +4,7 @@ import ( "crypto/sha1" "errors" "fmt" + "io/fs" "os" "path/filepath" "strings" @@ -157,6 +158,41 @@ func ValidateRelativePath(rootDir, relativePath string) (string, error) { return filepath.Rel(rootDir, absPath) } +// InvalidGitDirectoryError is returned when a Git directory being validated is invalid. +type InvalidGitDirectoryError struct { + // MissingEntry is the expected directory entry that was missing. + MissingEntry string +} + +// Error returns the error message. +func (err InvalidGitDirectoryError) Error() string { + return "invalid git directory" +} + +// ValidateGitDirectory validates the directory at the given path looks like a valid +// Git repository. If the path points to a valid directory which doesn't contain the +// expected entries InvalidGitDirectoryError is returned. A directory is considered to +// be valid Git directory if it contains 'objects', 'refs' and 'HEAD'. +func ValidateGitDirectory(path string) error { + if info, err := os.Stat(path); err != nil { + return fmt.Errorf("stat: %w", err) + } else if !info.IsDir() { + return errors.New("not a directory") + } + + for _, file := range []string{"objects", "refs", "HEAD"} { + if _, err := os.Stat(filepath.Join(path, file)); err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("stat %q: %w", file, err) + } + + return InvalidGitDirectoryError{MissingEntry: file} + } + } + + return nil +} + // QuarantineDirectoryPrefix returns a prefix for use in the temporary directory. The prefix is // based on the relative repository path and will stay stable for any given repository. This allows // us to verify that a given quarantine object directory indeed belongs to the repository at hand. diff --git a/internal/gitaly/storage/locator_test.go b/internal/gitaly/storage/locator_test.go index 506a8ec52..ccc0bce48 100644 --- a/internal/gitaly/storage/locator_test.go +++ b/internal/gitaly/storage/locator_test.go @@ -1,6 +1,10 @@ package storage import ( + "errors" + "io/fs" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" @@ -118,3 +122,50 @@ func TestQuarantineDirectoryPrefix(t *testing.T) { GlProjectPath: "gl/repo", })) } + +func TestValidateGitDirectory(t *testing.T) { + t.Run("path does not exist", func(t *testing.T) { + require.ErrorIs(t, + ValidateGitDirectory(filepath.Join(t.TempDir(), "non-existent")), + fs.ErrNotExist, + ) + }) + + t.Run("path is not a directory", func(t *testing.T) { + path := filepath.Join(t.TempDir(), "file") + require.NoError(t, os.WriteFile(path, nil, fs.ModePerm)) + require.Equal(t, errors.New("not a directory"), ValidateGitDirectory(path)) + }) + + // Mock the repository creation as our repository creating helpers depend on storage package + // and using them would lead to a cyclic import. + createRepository := func(t *testing.T) string { + t.Helper() + path := t.TempDir() + for _, entry := range []string{"objects", "refs", "HEAD"} { + require.NoError(t, os.WriteFile(filepath.Join(path, entry), nil, fs.ModePerm)) + } + + return path + } + + t.Run("missing entry", func(t *testing.T) { + for _, entry := range []string{ + "objects", "refs", "HEAD", + } { + t.Run(entry, func(t *testing.T) { + repoPath := createRepository(t) + require.NoError(t, os.RemoveAll(filepath.Join(repoPath, entry))) + + require.Equal(t, + InvalidGitDirectoryError{MissingEntry: entry}, + ValidateGitDirectory(repoPath), + ) + }) + } + }) + + t.Run("valid repository", func(t *testing.T) { + require.NoError(t, ValidateGitDirectory(createRepository(t))) + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition_assigner.go b/internal/gitaly/storage/storagemgr/partition_assigner.go index ec8ef9cdd..418e70b4b 100644 --- a/internal/gitaly/storage/storagemgr/partition_assigner.go +++ b/internal/gitaly/storage/storagemgr/partition_assigner.go @@ -5,15 +5,35 @@ import ( "encoding/binary" "errors" "fmt" + "io/fs" + "path/filepath" "strconv" "sync" "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" ) -// errPartitionAssignmentNotFound is returned when attempting to access a -// partition assignment in the database that doesn't yet exist. -var errPartitionAssignmentNotFound = errors.New("partition assignment not found") +var ( + // errPartitionAssignmentNotFound is returned when attempting to access a + // partition assignment in the database that doesn't yet exist. + errPartitionAssignmentNotFound = errors.New("partition assignment not found") + // errNoAlternate is used internally by partitionAssigner to signal a repository + // has no alternates. + errNoAlternate = errors.New("repository has no alternate") + // errMultipleAlternates is returned when a repository has multiple alternates + // configured. + errMultipleAlternates = errors.New("repository has multiple alternates") + // errAlternatePointsToSelf is returned when a repository's alternate points to the + // repository itself. + errAlternatePointsToSelf = errors.New("repository's alternate points to self") + // errAlternateHasAlternate is returned when a repository's alternate itself has an + // alternate listed. + errAlternateHasAlternate = errors.New("repository's alternate has an alternate itself") +) + +const prefixPartitionAssignment = "partition_assignment/" // partitionID uniquely identifies a partition. type partitionID uint64 @@ -40,7 +60,7 @@ func newPartitionAssignmentTable(db *badger.DB) *partitionAssignmentTable { } func (pt *partitionAssignmentTable) key(relativePath string) []byte { - return []byte(fmt.Sprintf("partition_assignment/%s", relativePath)) + return []byte(fmt.Sprintf("%s%s", prefixPartitionAssignment, relativePath)) } func (pt *partitionAssignmentTable) getPartitionID(relativePath string) (partitionID, error) { @@ -90,11 +110,14 @@ type partitionAssigner struct { idSequence *badger.Sequence // partitionAssignmentTable contains the partition assignment records. partitionAssignmentTable *partitionAssignmentTable + // storagePath is the path to the root directory of the storage the relative + // paths are computed against. + storagePath string } // newPartitionAssigner returns a new partitionAssigner. Close must be called on the // returned instance to release acquired resources. -func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) { +func newPartitionAssigner(db *badger.DB, storagePath string) (*partitionAssigner, error) { seq, err := db.GetSequence([]byte("partition_id_seq"), 100) if err != nil { return nil, fmt.Errorf("get sequence: %w", err) @@ -104,6 +127,7 @@ func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) { repositoryLocks: make(map[string]chan struct{}), idSequence: seq, partitionAssignmentTable: newPartitionAssignmentTable(db), + storagePath: storagePath, }, nil } @@ -126,10 +150,16 @@ func (pa *partitionAssigner) allocatePartitionID() (partitionID, error) { return partitionID(id), nil } -// getPartititionID returns the partition ID of the repository. If the repository wasn't yet assigned into +// getPartitionID returns the partition ID of the repository. If the repository wasn't yet assigned into // a partition, it will be assigned into one and the assignment stored. Further accesses return the stored -// partition ID. Each repository goes into its own partition. The method is safe to call concurrently. +// partition ID. Repositories without an alternate go into their own partitions. Repositories with an alternate +// are assigned into the same partition as the alternate repository. The alternate is assigned into a partition +// if it hasn't yet been. The method is safe to call concurrently. func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath string) (partitionID, error) { + return pa.getPartitionIDRecursive(ctx, relativePath, false) +} + +func (pa *partitionAssigner) getPartitionIDRecursive(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) { ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath) if err != nil { if !errors.Is(err, errPartitionAssignmentNotFound) { @@ -183,16 +213,94 @@ func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath st return ptnID, nil } - // Each repository goes into its own partition. Allocate a new partition ID for this - // repository. + ptnID, err = pa.assignPartitionID(ctx, relativePath, recursiveCall) + if err != nil { + return 0, fmt.Errorf("assign partition ID: %w", err) + } + } + + return ptnID, nil +} + +func (pa *partitionAssigner) assignPartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) { + // Check if the repository has an alternate. If so, it needs to go into the same + // partition with it. + ptnID, err := pa.getAlternatePartitionID(ctx, relativePath, recursiveCall) + if err != nil { + if !errors.Is(err, errNoAlternate) { + return 0, fmt.Errorf("get alternate partition ID: %w", err) + } + + // The repository has no alternate. Unpooled repositories go into their own partitions. + // Allocate a new partition ID for this repository. ptnID, err = pa.allocatePartitionID() if err != nil { return 0, fmt.Errorf("acquire partition id: %w", err) } + } + + if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil { + return 0, fmt.Errorf("set partition: %w", err) + } - if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil { - return 0, fmt.Errorf("set partition: %w", err) + return ptnID, nil +} + +func (pa *partitionAssigner) getAlternatePartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) { + alternates, err := stats.ReadAlternatesFile(filepath.Join(pa.storagePath, relativePath)) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + return 0, errNoAlternate } + + return 0, fmt.Errorf("read alternates file: %w", err) + } + + if len(alternates) == 0 { + return 0, errNoAlternate + } else if recursiveCall { + // recursive being true indicates we've arrived here through another repository's alternate. + // Repositories in Gitaly should only have a single alternate that points to the repository's + // pool. Chains of alternates are unexpected and could go arbitrarily long, so fail the operation. + return 0, errAlternateHasAlternate + } else if len(alternates) > 1 { + // Repositories shouldn't have more than one alternate given they should only be + // linked to a single pool at most. + return 0, errMultipleAlternates + } + + // The relative path should point somewhere within the same storage. + alternateRelativePath, err := storage.ValidateRelativePath( + pa.storagePath, + // Take the relative path to the repository, not 'repository/objects'. + filepath.Dir( + // The path in alternates file points to the object directory of the alternate + // repository. The path is relative to the repository's own object directory. + filepath.Join(relativePath, "objects", alternates[0]), + ), + ) + if err != nil { + return 0, fmt.Errorf("validate relative path: %w", err) + } + + if alternateRelativePath == relativePath { + // The alternate must not point to the repository itself. Not only is it non-sensical + // but it would also cause a dead lock as the repository is locked during this call + // already. + return 0, errAlternatePointsToSelf + } + + // The relative path should point to a Git directory. + if err := storage.ValidateGitDirectory(filepath.Join(pa.storagePath, alternateRelativePath)); err != nil { + return 0, fmt.Errorf("validate git directory: %w", err) + } + + // Recursively get the alternate's partition ID or assign it one. This time + // we set recursive to true to fail the operation if the alternate itself has an + // alternate configured. + ptnID, err := pa.getPartitionIDRecursive(ctx, alternateRelativePath, true) + if err != nil { + return 0, fmt.Errorf("get partition ID: %w", err) } return ptnID, nil diff --git a/internal/gitaly/storage/storagemgr/partition_assigner_test.go b/internal/gitaly/storage/storagemgr/partition_assigner_test.go index fb8a05b1e..9ccf67d98 100644 --- a/internal/gitaly/storage/storagemgr/partition_assigner_test.go +++ b/internal/gitaly/storage/storagemgr/partition_assigner_test.go @@ -1,21 +1,59 @@ package storagemgr import ( - "fmt" + "io/fs" + "os" + "path/filepath" + "strings" "sync" "testing" + "github.com/dgraph-io/badger/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) +type partitionAssignments map[string]partitionID + +func getPartitionAssignments(tb testing.TB, db *badger.DB) partitionAssignments { + tb.Helper() + + state := partitionAssignments{} + require.NoError(tb, db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.IteratorOptions{ + Prefix: []byte(prefixPartitionAssignment), + }) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + value, err := it.Item().ValueCopy(nil) + require.NoError(tb, err) + + var ptnID partitionID + ptnID.UnmarshalBinary(value) + + relativePath := strings.TrimPrefix(string(it.Item().Key()), prefixPartitionAssignment) + state[relativePath] = ptnID + } + + return nil + })) + + return state +} + func TestPartitionAssigner(t *testing.T) { db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir()) require.NoError(t, err) defer testhelper.MustClose(t, db) - pa, err := newPartitionAssigner(db) + cfg := testcfg.Build(t) + pa, err := newPartitionAssigner(db, cfg.Storages[0].Path) require.NoError(t, err) defer testhelper.MustClose(t, pa) @@ -39,13 +77,142 @@ func TestPartitionAssigner(t *testing.T) { require.EqualValues(t, 1, ptnID1) } +func TestPartitionAssigner_alternates(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + memberAlternatesContent []byte + poolAlternatesContent []byte + expectedError error + expectedPartitionAssignments partitionAssignments + }{ + { + desc: "no alternates file", + expectedPartitionAssignments: partitionAssignments{ + "member": 1, + "pool": 2, + }, + }, + { + desc: "empty alternates file", + memberAlternatesContent: []byte(""), + expectedPartitionAssignments: partitionAssignments{ + "member": 1, + "pool": 2, + }, + }, + { + desc: "not a git directory", + memberAlternatesContent: []byte("../.."), + expectedError: storage.InvalidGitDirectoryError{MissingEntry: "objects"}, + }, + { + desc: "points to pool", + memberAlternatesContent: []byte("../../pool/objects"), + expectedPartitionAssignments: partitionAssignments{ + "member": 1, + "pool": 1, + }, + }, + { + desc: "points to pool with newline", + memberAlternatesContent: []byte("../../pool/objects\n"), + expectedPartitionAssignments: partitionAssignments{ + "member": 1, + "pool": 1, + }, + }, + { + desc: "multiple alternates fail", + memberAlternatesContent: []byte("../../pool/objects\nother-alternate"), + expectedError: errMultipleAlternates, + }, + { + desc: "alternate pointing to self fails", + memberAlternatesContent: []byte("../objects"), + expectedError: errAlternatePointsToSelf, + }, + { + desc: "alternate having an alternate fails", + poolAlternatesContent: []byte("unexpected"), + memberAlternatesContent: []byte("../../pool/objects"), + expectedError: errAlternateHasAlternate, + }, + { + desc: "alternate points outside the storage", + memberAlternatesContent: []byte("../../../../.."), + expectedError: storage.ErrRelativePathEscapesRoot, + }, + } { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + + ctx := testhelper.Context(t) + poolRepo, poolPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: "pool", + }) + + memberRepo, memberPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: "member", + }) + + writeAlternatesFile := func(t *testing.T, repoPath string, content []byte) { + t.Helper() + require.NoError(t, os.WriteFile(stats.AlternatesFilePath(repoPath), content, os.ModePerm)) + } + + if tc.poolAlternatesContent != nil { + writeAlternatesFile(t, poolPath, tc.poolAlternatesContent) + } + + if tc.memberAlternatesContent != nil { + writeAlternatesFile(t, memberPath, tc.memberAlternatesContent) + } + + db, err := OpenDatabase(testhelper.NewLogger(t), t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, db) + + pa, err := newPartitionAssigner(db, cfg.Storages[0].Path) + require.NoError(t, err) + defer testhelper.MustClose(t, pa) + + expectedPartitionAssignments := tc.expectedPartitionAssignments + if expectedPartitionAssignments == nil { + expectedPartitionAssignments = partitionAssignments{} + } + + if memberPartitionID, err := pa.getPartitionID(ctx, memberRepo.RelativePath); tc.expectedError != nil { + require.ErrorIs(t, err, tc.expectedError) + } else { + require.NoError(t, err) + require.Equal(t, expectedPartitionAssignments["member"], memberPartitionID) + + poolPartitionID, err := pa.getPartitionID(ctx, poolRepo.RelativePath) + require.NoError(t, err) + require.Equal(t, expectedPartitionAssignments["pool"], poolPartitionID) + } + + require.Equal(t, expectedPartitionAssignments, getPartitionAssignments(t, db)) + }) + } +} + func TestPartitionAssigner_close(t *testing.T) { dbDir := t.TempDir() db, err := OpenDatabase(testhelper.SharedLogger(t), dbDir) require.NoError(t, err) - pa, err := newPartitionAssigner(db) + cfg := testcfg.Build(t) + + pa, err := newPartitionAssigner(db, cfg.Storages[0].Path) require.NoError(t, err) testhelper.MustClose(t, pa) testhelper.MustClose(t, db) @@ -54,7 +221,7 @@ func TestPartitionAssigner_close(t *testing.T) { require.NoError(t, err) defer testhelper.MustClose(t, db) - pa, err = newPartitionAssigner(db) + pa, err = newPartitionAssigner(db, cfg.Storages[0].Path) require.NoError(t, err) defer testhelper.MustClose(t, pa) @@ -67,53 +234,111 @@ func TestPartitionAssigner_close(t *testing.T) { } func TestPartitionAssigner_concurrentAccess(t *testing.T) { - db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir()) - require.NoError(t, err) - defer testhelper.MustClose(t, db) + t.Parallel() - pa, err := newPartitionAssigner(db) - require.NoError(t, err) - defer testhelper.MustClose(t, pa) + for _, tc := range []struct { + desc string + withAlternate bool + }{ + { + desc: "without alternate", + }, + { + desc: "with alternate", + withAlternate: true, + }, + } { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() - // Access 10 repositories concurrently. - repositoryCount := 10 - // Access each repository from 10 goroutines concurrently. - goroutineCount := 10 + db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, db) - collectedIDs := make([][]partitionID, repositoryCount) - ctx := testhelper.Context(t) - wg := sync.WaitGroup{} - start := make(chan struct{}) - for i := 0; i < repositoryCount; i++ { - i := i - collectedIDs[i] = make([]partitionID, goroutineCount) - relativePath := fmt.Sprintf("relative-path-%d", i) - for j := 0; j < goroutineCount; j++ { - j := j - wg.Add(1) - go func() { - defer wg.Done() - <-start - ptnID, err := pa.getPartitionID(ctx, relativePath) - assert.NoError(t, err) - collectedIDs[i][j] = ptnID - }() - } - } + cfg := testcfg.Build(t) - close(start) - wg.Wait() + pa, err := newPartitionAssigner(db, cfg.Storages[0].Path) + require.NoError(t, err) + defer testhelper.MustClose(t, pa) - var partitionIDs []partitionID - for _, ids := range collectedIDs { - partitionIDs = append(partitionIDs, ids[0]) - for i := range ids { - // We expect all goroutines accessing a given repository to get the - // same partition ID for it. - require.Equal(t, ids[0], ids[i], ids) - } - } + // Access 10 repositories concurrently. + repositoryCount := 10 + // Access each repository from 10 goroutines concurrently. + goroutineCount := 10 + + collectedIDs := make([][]partitionID, repositoryCount) + ctx := testhelper.Context(t) + wg := sync.WaitGroup{} + start := make(chan struct{}) + + pool, poolPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + for i := 0; i < repositoryCount; i++ { + i := i + collectedIDs[i] = make([]partitionID, goroutineCount) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + if tc.withAlternate { + // Link the repositories to the pool. + alternateRelativePath, err := filepath.Rel( + filepath.Join(repoPath, "objects"), + filepath.Join(poolPath, "objects"), + ) + require.NoError(t, err) + require.NoError(t, os.WriteFile(filepath.Join(repoPath, "objects", "info", "alternates"), []byte(alternateRelativePath), fs.ModePerm)) - // We expect to have 10 unique partition IDs as there are 10 repositories being accessed. - require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs) + wg.Add(1) + go func() { + defer wg.Done() + <-start + _, err := pa.getPartitionID(ctx, repo.RelativePath) + assert.NoError(t, err) + }() + } + + for j := 0; j < goroutineCount; j++ { + j := j + wg.Add(1) + go func() { + defer wg.Done() + <-start + ptnID, err := pa.getPartitionID(ctx, repo.RelativePath) + assert.NoError(t, err) + collectedIDs[i][j] = ptnID + }() + } + } + + close(start) + wg.Wait() + + var partitionIDs []partitionID + for _, ids := range collectedIDs { + partitionIDs = append(partitionIDs, ids[0]) + for i := range ids { + // We expect all goroutines accessing a given repository to get the + // same partition ID for it. + require.Equal(t, ids[0], ids[i], ids) + } + } + + if tc.withAlternate { + // We expect all repositories to have been assigned to the same partition as they are all linked to the same pool. + require.Equal(t, []partitionID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, partitionIDs) + ptnID, err := pa.getPartitionID(ctx, pool.RelativePath) + require.NoError(t, err) + require.Equal(t, partitionID(1), ptnID, "pool should have been assigned into the same partition as the linked repositories") + return + } + + // We expect to have 10 unique partition IDs as there are 10 repositories being accessed. + require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs) + }) + } } diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index e30e5cc23..9ff3e2f72 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -215,7 +215,7 @@ func NewPartitionManager( return nil, fmt.Errorf("create storage's database directory: %w", err) } - pa, err := newPartitionAssigner(db) + pa, err := newPartitionAssigner(db, storage.Path) if err != nil { return nil, fmt.Errorf("new partition assigner: %w", err) } |