Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-09-15 21:28:43 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-09-15 21:28:43 +0300
commit971fb9142d495e10b571d4a16d75c6baba0c615e (patch)
tree82b4b38e659f355e5d2dc16b627184afd1c993e1
parent3958129f4b15a62ed6a11b8dbb7f8ba5134963db (diff)
parentefdb4e3fea82fce31526ab50656e74d869490076 (diff)
Merge branch 'smh-object-pool-partitioning' into 'master'
Assign pooled repositories to the same partition as the pool See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6350 Merged-by: Justin Tobler <jtobler@gitlab.com> Approved-by: Justin Tobler <jtobler@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com> Reviewed-by: Justin Tobler <jtobler@gitlab.com> Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--internal/git/stats/repository_info.go53
-rw-r--r--internal/git/stats/repository_info_test.go69
-rw-r--r--internal/gitaly/config/locator.go19
-rw-r--r--internal/gitaly/storage/locator.go36
-rw-r--r--internal/gitaly/storage/locator_test.go51
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner.go130
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner_test.go319
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go2
8 files changed, 591 insertions, 88 deletions
diff --git a/internal/git/stats/repository_info.go b/internal/git/stats/repository_info.go
index eede939e9..159b31b00 100644
--- a/internal/git/stats/repository_info.go
+++ b/internal/git/stats/repository_info.go
@@ -522,26 +522,21 @@ func (a AlternatesInfo) AbsoluteObjectDirectories() []string {
return alternatePaths
}
-// AlternatesInfoForRepository reads the alternates file and returns information on it. This
-// function does not return an error in case the alternates file doesn't exist. Existence can be
-// checked via the `Exists` field of the returned `AlternatesInfo` structure.
-func AlternatesInfoForRepository(repoPath string) (AlternatesInfo, error) {
- file, err := os.Open(filepath.Join(repoPath, "objects", "info", "alternates"))
- if err != nil {
- if errors.Is(err, os.ErrNotExist) {
- return AlternatesInfo{
- Exists: false,
- }, nil
- }
-
- return AlternatesInfo{}, err
- }
- defer file.Close()
+// AlternatesFilePath returns the 'objects/info/alternates'
+// file's path in the repository.
+func AlternatesFilePath(repoPath string) string {
+ return filepath.Join(repoPath, "objects", "info", "alternates")
+}
- stat, err := file.Stat()
+// ReadAlternatesFile returns the repository's alternate object directory paths
+// from '<repo>/objects/infop/alternates' and returns them. Returns a wrapped
+// fs.ErrNotExist if the file doesn't exist.
+func ReadAlternatesFile(repoPath string) ([]string, error) {
+ file, err := os.Open(AlternatesFilePath(repoPath))
if err != nil {
- return AlternatesInfo{}, err
+ return nil, fmt.Errorf("open: %w", err)
}
+ defer file.Close()
var alternatePaths []string
scanner := bufio.NewScanner(file)
@@ -559,8 +554,30 @@ func AlternatesInfoForRepository(repoPath string) (AlternatesInfo, error) {
alternatePaths = append(alternatePaths, scanner.Text())
}
}
+
if err := scanner.Err(); err != nil {
- return AlternatesInfo{}, fmt.Errorf("scanning alternate paths: %w", err)
+ return nil, fmt.Errorf("scanning alternate paths: %w", err)
+ }
+
+ return alternatePaths, nil
+}
+
+// AlternatesInfoForRepository reads the alternates file and returns information on it. This
+// function does not return an error in case the alternates file doesn't exist. Existence can be
+// checked via the `Exists` field of the returned `AlternatesInfo` structure.
+func AlternatesInfoForRepository(repoPath string) (AlternatesInfo, error) {
+ alternatePaths, err := ReadAlternatesFile(repoPath)
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return AlternatesInfo{Exists: false}, nil
+ }
+
+ return AlternatesInfo{}, fmt.Errorf("read alternates file: %w", err)
+ }
+
+ stat, err := os.Stat(AlternatesFilePath(repoPath))
+ if err != nil {
+ return AlternatesInfo{}, fmt.Errorf("stat: %w", err)
}
return AlternatesInfo{
diff --git a/internal/git/stats/repository_info_test.go b/internal/git/stats/repository_info_test.go
index a904ae4de..81285cdc1 100644
--- a/internal/git/stats/repository_info_test.go
+++ b/internal/git/stats/repository_info_test.go
@@ -752,6 +752,75 @@ func TestAlternatesInfoForRepository(t *testing.T) {
}
}
+func TestReadAlternatesFile(t *testing.T) {
+ for _, tc := range []struct {
+ desc string
+ alternatesContent []byte
+ expectedAlternates []string
+ expectedError error
+ }{
+ {
+ desc: "no alternates file",
+ expectedError: fs.ErrNotExist,
+ },
+ {
+ desc: "empty alternates",
+ alternatesContent: []byte(""),
+ },
+ {
+ desc: "empty lines",
+ alternatesContent: []byte("\n\n"),
+ },
+ {
+ desc: "path between empty lines",
+ alternatesContent: []byte("\n/path/1\n"),
+ expectedAlternates: []string{"/path/1"},
+ },
+ {
+ desc: "path without newline",
+ alternatesContent: []byte("../some/path"),
+ expectedAlternates: []string{"../some/path"},
+ },
+ {
+ desc: "path with newline",
+ alternatesContent: []byte("../some/path\n"),
+ expectedAlternates: []string{"../some/path"},
+ },
+ {
+ desc: "multiple different paths",
+ alternatesContent: []byte("path/1\n/path/2\npath/3"),
+ expectedAlternates: []string{"path/1", "/path/2", "path/3"},
+ },
+ {
+ desc: "same path multiple times",
+ alternatesContent: []byte("path/1\npath/1\n"),
+ expectedAlternates: []string{"path/1", "path/1"},
+ },
+ {
+ desc: "commented line ignored",
+ alternatesContent: []byte("path/1\n#THIS LINE IS IGNORED\npath/2\n"),
+ expectedAlternates: []string{"path/1", "path/2"},
+ },
+ } {
+ t.Run(tc.desc, func(t *testing.T) {
+ ctx := testhelper.Context(t)
+ cfg := testcfg.Build(t)
+
+ _, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ if tc.alternatesContent != nil {
+ require.NoError(t, os.WriteFile(AlternatesFilePath(repoPath), tc.alternatesContent, fs.ModePerm))
+ }
+
+ alternates, err := ReadAlternatesFile(repoPath)
+ require.ErrorIs(t, err, tc.expectedError)
+ require.Equal(t, tc.expectedAlternates, alternates)
+ })
+ }
+}
+
func TestReferencesInfoForRepository(t *testing.T) {
t.Parallel()
diff --git a/internal/gitaly/config/locator.go b/internal/gitaly/config/locator.go
index f077d039f..ec989f29f 100644
--- a/internal/gitaly/config/locator.go
+++ b/internal/gitaly/config/locator.go
@@ -84,22 +84,19 @@ func (l *configLocator) ValidateRepository(repo storage.Repository, opts ...stor
}
if !cfg.SkipRepositoryExistenceCheck {
- if _, err := os.Stat(path); err != nil {
+ if err := storage.ValidateGitDirectory(path); err != nil {
if errors.Is(err, os.ErrNotExist) {
return storage.NewRepositoryNotFoundError(repo.GetStorageName(), repo.GetRelativePath())
}
- return structerr.New("statting repository: %w", err).WithMetadata("repository_path", path)
- }
-
- for _, element := range []string{"objects", "refs", "HEAD"} {
- if _, err := os.Stat(filepath.Join(path, element)); err != nil {
- if errors.Is(err, os.ErrNotExist) {
- return structerr.NewFailedPrecondition("%w: %q does not exist", storage.ErrRepositoryNotValid, element).WithMetadata("repository_path", path)
- }
-
- return structerr.New("statting %q: %w", element, err).WithMetadata("repository_path", path)
+ var errInvalidGitDir storage.InvalidGitDirectoryError
+ if errors.As(err, &errInvalidGitDir) {
+ return structerr.NewFailedPrecondition(
+ "%w: %q does not exist", storage.ErrRepositoryNotValid, errInvalidGitDir.MissingEntry,
+ ).WithMetadata("repository_path", path)
}
+
+ return structerr.New("validate git directory: %w", err).WithMetadata("repository_path", path)
}
// See: https://gitlab.com/gitlab-org/gitaly/issues/1339
diff --git a/internal/gitaly/storage/locator.go b/internal/gitaly/storage/locator.go
index 5a116de6a..fe5ea23b1 100644
--- a/internal/gitaly/storage/locator.go
+++ b/internal/gitaly/storage/locator.go
@@ -4,6 +4,7 @@ import (
"crypto/sha1"
"errors"
"fmt"
+ "io/fs"
"os"
"path/filepath"
"strings"
@@ -157,6 +158,41 @@ func ValidateRelativePath(rootDir, relativePath string) (string, error) {
return filepath.Rel(rootDir, absPath)
}
+// InvalidGitDirectoryError is returned when a Git directory being validated is invalid.
+type InvalidGitDirectoryError struct {
+ // MissingEntry is the expected directory entry that was missing.
+ MissingEntry string
+}
+
+// Error returns the error message.
+func (err InvalidGitDirectoryError) Error() string {
+ return "invalid git directory"
+}
+
+// ValidateGitDirectory validates the directory at the given path looks like a valid
+// Git repository. If the path points to a valid directory which doesn't contain the
+// expected entries InvalidGitDirectoryError is returned. A directory is considered to
+// be valid Git directory if it contains 'objects', 'refs' and 'HEAD'.
+func ValidateGitDirectory(path string) error {
+ if info, err := os.Stat(path); err != nil {
+ return fmt.Errorf("stat: %w", err)
+ } else if !info.IsDir() {
+ return errors.New("not a directory")
+ }
+
+ for _, file := range []string{"objects", "refs", "HEAD"} {
+ if _, err := os.Stat(filepath.Join(path, file)); err != nil {
+ if !errors.Is(err, fs.ErrNotExist) {
+ return fmt.Errorf("stat %q: %w", file, err)
+ }
+
+ return InvalidGitDirectoryError{MissingEntry: file}
+ }
+ }
+
+ return nil
+}
+
// QuarantineDirectoryPrefix returns a prefix for use in the temporary directory. The prefix is
// based on the relative repository path and will stay stable for any given repository. This allows
// us to verify that a given quarantine object directory indeed belongs to the repository at hand.
diff --git a/internal/gitaly/storage/locator_test.go b/internal/gitaly/storage/locator_test.go
index 506a8ec52..ccc0bce48 100644
--- a/internal/gitaly/storage/locator_test.go
+++ b/internal/gitaly/storage/locator_test.go
@@ -1,6 +1,10 @@
package storage
import (
+ "errors"
+ "io/fs"
+ "os"
+ "path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@@ -118,3 +122,50 @@ func TestQuarantineDirectoryPrefix(t *testing.T) {
GlProjectPath: "gl/repo",
}))
}
+
+func TestValidateGitDirectory(t *testing.T) {
+ t.Run("path does not exist", func(t *testing.T) {
+ require.ErrorIs(t,
+ ValidateGitDirectory(filepath.Join(t.TempDir(), "non-existent")),
+ fs.ErrNotExist,
+ )
+ })
+
+ t.Run("path is not a directory", func(t *testing.T) {
+ path := filepath.Join(t.TempDir(), "file")
+ require.NoError(t, os.WriteFile(path, nil, fs.ModePerm))
+ require.Equal(t, errors.New("not a directory"), ValidateGitDirectory(path))
+ })
+
+ // Mock the repository creation as our repository creating helpers depend on storage package
+ // and using them would lead to a cyclic import.
+ createRepository := func(t *testing.T) string {
+ t.Helper()
+ path := t.TempDir()
+ for _, entry := range []string{"objects", "refs", "HEAD"} {
+ require.NoError(t, os.WriteFile(filepath.Join(path, entry), nil, fs.ModePerm))
+ }
+
+ return path
+ }
+
+ t.Run("missing entry", func(t *testing.T) {
+ for _, entry := range []string{
+ "objects", "refs", "HEAD",
+ } {
+ t.Run(entry, func(t *testing.T) {
+ repoPath := createRepository(t)
+ require.NoError(t, os.RemoveAll(filepath.Join(repoPath, entry)))
+
+ require.Equal(t,
+ InvalidGitDirectoryError{MissingEntry: entry},
+ ValidateGitDirectory(repoPath),
+ )
+ })
+ }
+ })
+
+ t.Run("valid repository", func(t *testing.T) {
+ require.NoError(t, ValidateGitDirectory(createRepository(t)))
+ })
+}
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner.go b/internal/gitaly/storage/storagemgr/partition_assigner.go
index ec8ef9cdd..418e70b4b 100644
--- a/internal/gitaly/storage/storagemgr/partition_assigner.go
+++ b/internal/gitaly/storage/storagemgr/partition_assigner.go
@@ -5,15 +5,35 @@ import (
"encoding/binary"
"errors"
"fmt"
+ "io/fs"
+ "path/filepath"
"strconv"
"sync"
"github.com/dgraph-io/badger/v4"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
)
-// errPartitionAssignmentNotFound is returned when attempting to access a
-// partition assignment in the database that doesn't yet exist.
-var errPartitionAssignmentNotFound = errors.New("partition assignment not found")
+var (
+ // errPartitionAssignmentNotFound is returned when attempting to access a
+ // partition assignment in the database that doesn't yet exist.
+ errPartitionAssignmentNotFound = errors.New("partition assignment not found")
+ // errNoAlternate is used internally by partitionAssigner to signal a repository
+ // has no alternates.
+ errNoAlternate = errors.New("repository has no alternate")
+ // errMultipleAlternates is returned when a repository has multiple alternates
+ // configured.
+ errMultipleAlternates = errors.New("repository has multiple alternates")
+ // errAlternatePointsToSelf is returned when a repository's alternate points to the
+ // repository itself.
+ errAlternatePointsToSelf = errors.New("repository's alternate points to self")
+ // errAlternateHasAlternate is returned when a repository's alternate itself has an
+ // alternate listed.
+ errAlternateHasAlternate = errors.New("repository's alternate has an alternate itself")
+)
+
+const prefixPartitionAssignment = "partition_assignment/"
// partitionID uniquely identifies a partition.
type partitionID uint64
@@ -40,7 +60,7 @@ func newPartitionAssignmentTable(db *badger.DB) *partitionAssignmentTable {
}
func (pt *partitionAssignmentTable) key(relativePath string) []byte {
- return []byte(fmt.Sprintf("partition_assignment/%s", relativePath))
+ return []byte(fmt.Sprintf("%s%s", prefixPartitionAssignment, relativePath))
}
func (pt *partitionAssignmentTable) getPartitionID(relativePath string) (partitionID, error) {
@@ -90,11 +110,14 @@ type partitionAssigner struct {
idSequence *badger.Sequence
// partitionAssignmentTable contains the partition assignment records.
partitionAssignmentTable *partitionAssignmentTable
+ // storagePath is the path to the root directory of the storage the relative
+ // paths are computed against.
+ storagePath string
}
// newPartitionAssigner returns a new partitionAssigner. Close must be called on the
// returned instance to release acquired resources.
-func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) {
+func newPartitionAssigner(db *badger.DB, storagePath string) (*partitionAssigner, error) {
seq, err := db.GetSequence([]byte("partition_id_seq"), 100)
if err != nil {
return nil, fmt.Errorf("get sequence: %w", err)
@@ -104,6 +127,7 @@ func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) {
repositoryLocks: make(map[string]chan struct{}),
idSequence: seq,
partitionAssignmentTable: newPartitionAssignmentTable(db),
+ storagePath: storagePath,
}, nil
}
@@ -126,10 +150,16 @@ func (pa *partitionAssigner) allocatePartitionID() (partitionID, error) {
return partitionID(id), nil
}
-// getPartititionID returns the partition ID of the repository. If the repository wasn't yet assigned into
+// getPartitionID returns the partition ID of the repository. If the repository wasn't yet assigned into
// a partition, it will be assigned into one and the assignment stored. Further accesses return the stored
-// partition ID. Each repository goes into its own partition. The method is safe to call concurrently.
+// partition ID. Repositories without an alternate go into their own partitions. Repositories with an alternate
+// are assigned into the same partition as the alternate repository. The alternate is assigned into a partition
+// if it hasn't yet been. The method is safe to call concurrently.
func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath string) (partitionID, error) {
+ return pa.getPartitionIDRecursive(ctx, relativePath, false)
+}
+
+func (pa *partitionAssigner) getPartitionIDRecursive(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath)
if err != nil {
if !errors.Is(err, errPartitionAssignmentNotFound) {
@@ -183,16 +213,94 @@ func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath st
return ptnID, nil
}
- // Each repository goes into its own partition. Allocate a new partition ID for this
- // repository.
+ ptnID, err = pa.assignPartitionID(ctx, relativePath, recursiveCall)
+ if err != nil {
+ return 0, fmt.Errorf("assign partition ID: %w", err)
+ }
+ }
+
+ return ptnID, nil
+}
+
+func (pa *partitionAssigner) assignPartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
+ // Check if the repository has an alternate. If so, it needs to go into the same
+ // partition with it.
+ ptnID, err := pa.getAlternatePartitionID(ctx, relativePath, recursiveCall)
+ if err != nil {
+ if !errors.Is(err, errNoAlternate) {
+ return 0, fmt.Errorf("get alternate partition ID: %w", err)
+ }
+
+ // The repository has no alternate. Unpooled repositories go into their own partitions.
+ // Allocate a new partition ID for this repository.
ptnID, err = pa.allocatePartitionID()
if err != nil {
return 0, fmt.Errorf("acquire partition id: %w", err)
}
+ }
+
+ if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil {
+ return 0, fmt.Errorf("set partition: %w", err)
+ }
- if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil {
- return 0, fmt.Errorf("set partition: %w", err)
+ return ptnID, nil
+}
+
+func (pa *partitionAssigner) getAlternatePartitionID(ctx context.Context, relativePath string, recursiveCall bool) (partitionID, error) {
+ alternates, err := stats.ReadAlternatesFile(filepath.Join(pa.storagePath, relativePath))
+ if err != nil {
+ if errors.Is(err, fs.ErrNotExist) {
+ return 0, errNoAlternate
}
+
+ return 0, fmt.Errorf("read alternates file: %w", err)
+ }
+
+ if len(alternates) == 0 {
+ return 0, errNoAlternate
+ } else if recursiveCall {
+ // recursive being true indicates we've arrived here through another repository's alternate.
+ // Repositories in Gitaly should only have a single alternate that points to the repository's
+ // pool. Chains of alternates are unexpected and could go arbitrarily long, so fail the operation.
+ return 0, errAlternateHasAlternate
+ } else if len(alternates) > 1 {
+ // Repositories shouldn't have more than one alternate given they should only be
+ // linked to a single pool at most.
+ return 0, errMultipleAlternates
+ }
+
+ // The relative path should point somewhere within the same storage.
+ alternateRelativePath, err := storage.ValidateRelativePath(
+ pa.storagePath,
+ // Take the relative path to the repository, not 'repository/objects'.
+ filepath.Dir(
+ // The path in alternates file points to the object directory of the alternate
+ // repository. The path is relative to the repository's own object directory.
+ filepath.Join(relativePath, "objects", alternates[0]),
+ ),
+ )
+ if err != nil {
+ return 0, fmt.Errorf("validate relative path: %w", err)
+ }
+
+ if alternateRelativePath == relativePath {
+ // The alternate must not point to the repository itself. Not only is it non-sensical
+ // but it would also cause a dead lock as the repository is locked during this call
+ // already.
+ return 0, errAlternatePointsToSelf
+ }
+
+ // The relative path should point to a Git directory.
+ if err := storage.ValidateGitDirectory(filepath.Join(pa.storagePath, alternateRelativePath)); err != nil {
+ return 0, fmt.Errorf("validate git directory: %w", err)
+ }
+
+ // Recursively get the alternate's partition ID or assign it one. This time
+ // we set recursive to true to fail the operation if the alternate itself has an
+ // alternate configured.
+ ptnID, err := pa.getPartitionIDRecursive(ctx, alternateRelativePath, true)
+ if err != nil {
+ return 0, fmt.Errorf("get partition ID: %w", err)
}
return ptnID, nil
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner_test.go b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
index fb8a05b1e..9ccf67d98 100644
--- a/internal/gitaly/storage/storagemgr/partition_assigner_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
@@ -1,21 +1,59 @@
package storagemgr
import (
- "fmt"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "strings"
"sync"
"testing"
+ "github.com/dgraph-io/badger/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
)
+type partitionAssignments map[string]partitionID
+
+func getPartitionAssignments(tb testing.TB, db *badger.DB) partitionAssignments {
+ tb.Helper()
+
+ state := partitionAssignments{}
+ require.NoError(tb, db.View(func(txn *badger.Txn) error {
+ it := txn.NewIterator(badger.IteratorOptions{
+ Prefix: []byte(prefixPartitionAssignment),
+ })
+ defer it.Close()
+
+ for it.Rewind(); it.Valid(); it.Next() {
+ value, err := it.Item().ValueCopy(nil)
+ require.NoError(tb, err)
+
+ var ptnID partitionID
+ ptnID.UnmarshalBinary(value)
+
+ relativePath := strings.TrimPrefix(string(it.Item().Key()), prefixPartitionAssignment)
+ state[relativePath] = ptnID
+ }
+
+ return nil
+ }))
+
+ return state
+}
+
func TestPartitionAssigner(t *testing.T) {
db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
require.NoError(t, err)
defer testhelper.MustClose(t, db)
- pa, err := newPartitionAssigner(db)
+ cfg := testcfg.Build(t)
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
require.NoError(t, err)
defer testhelper.MustClose(t, pa)
@@ -39,13 +77,142 @@ func TestPartitionAssigner(t *testing.T) {
require.EqualValues(t, 1, ptnID1)
}
+func TestPartitionAssigner_alternates(t *testing.T) {
+ t.Parallel()
+
+ for _, tc := range []struct {
+ desc string
+ memberAlternatesContent []byte
+ poolAlternatesContent []byte
+ expectedError error
+ expectedPartitionAssignments partitionAssignments
+ }{
+ {
+ desc: "no alternates file",
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 2,
+ },
+ },
+ {
+ desc: "empty alternates file",
+ memberAlternatesContent: []byte(""),
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 2,
+ },
+ },
+ {
+ desc: "not a git directory",
+ memberAlternatesContent: []byte("../.."),
+ expectedError: storage.InvalidGitDirectoryError{MissingEntry: "objects"},
+ },
+ {
+ desc: "points to pool",
+ memberAlternatesContent: []byte("../../pool/objects"),
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 1,
+ },
+ },
+ {
+ desc: "points to pool with newline",
+ memberAlternatesContent: []byte("../../pool/objects\n"),
+ expectedPartitionAssignments: partitionAssignments{
+ "member": 1,
+ "pool": 1,
+ },
+ },
+ {
+ desc: "multiple alternates fail",
+ memberAlternatesContent: []byte("../../pool/objects\nother-alternate"),
+ expectedError: errMultipleAlternates,
+ },
+ {
+ desc: "alternate pointing to self fails",
+ memberAlternatesContent: []byte("../objects"),
+ expectedError: errAlternatePointsToSelf,
+ },
+ {
+ desc: "alternate having an alternate fails",
+ poolAlternatesContent: []byte("unexpected"),
+ memberAlternatesContent: []byte("../../pool/objects"),
+ expectedError: errAlternateHasAlternate,
+ },
+ {
+ desc: "alternate points outside the storage",
+ memberAlternatesContent: []byte("../../../../.."),
+ expectedError: storage.ErrRelativePathEscapesRoot,
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+
+ cfg := testcfg.Build(t)
+
+ ctx := testhelper.Context(t)
+ poolRepo, poolPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ RelativePath: "pool",
+ })
+
+ memberRepo, memberPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ RelativePath: "member",
+ })
+
+ writeAlternatesFile := func(t *testing.T, repoPath string, content []byte) {
+ t.Helper()
+ require.NoError(t, os.WriteFile(stats.AlternatesFilePath(repoPath), content, os.ModePerm))
+ }
+
+ if tc.poolAlternatesContent != nil {
+ writeAlternatesFile(t, poolPath, tc.poolAlternatesContent)
+ }
+
+ if tc.memberAlternatesContent != nil {
+ writeAlternatesFile(t, memberPath, tc.memberAlternatesContent)
+ }
+
+ db, err := OpenDatabase(testhelper.NewLogger(t), t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, db)
+
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pa)
+
+ expectedPartitionAssignments := tc.expectedPartitionAssignments
+ if expectedPartitionAssignments == nil {
+ expectedPartitionAssignments = partitionAssignments{}
+ }
+
+ if memberPartitionID, err := pa.getPartitionID(ctx, memberRepo.RelativePath); tc.expectedError != nil {
+ require.ErrorIs(t, err, tc.expectedError)
+ } else {
+ require.NoError(t, err)
+ require.Equal(t, expectedPartitionAssignments["member"], memberPartitionID)
+
+ poolPartitionID, err := pa.getPartitionID(ctx, poolRepo.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, expectedPartitionAssignments["pool"], poolPartitionID)
+ }
+
+ require.Equal(t, expectedPartitionAssignments, getPartitionAssignments(t, db))
+ })
+ }
+}
+
func TestPartitionAssigner_close(t *testing.T) {
dbDir := t.TempDir()
db, err := OpenDatabase(testhelper.SharedLogger(t), dbDir)
require.NoError(t, err)
- pa, err := newPartitionAssigner(db)
+ cfg := testcfg.Build(t)
+
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
require.NoError(t, err)
testhelper.MustClose(t, pa)
testhelper.MustClose(t, db)
@@ -54,7 +221,7 @@ func TestPartitionAssigner_close(t *testing.T) {
require.NoError(t, err)
defer testhelper.MustClose(t, db)
- pa, err = newPartitionAssigner(db)
+ pa, err = newPartitionAssigner(db, cfg.Storages[0].Path)
require.NoError(t, err)
defer testhelper.MustClose(t, pa)
@@ -67,53 +234,111 @@ func TestPartitionAssigner_close(t *testing.T) {
}
func TestPartitionAssigner_concurrentAccess(t *testing.T) {
- db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
- require.NoError(t, err)
- defer testhelper.MustClose(t, db)
+ t.Parallel()
- pa, err := newPartitionAssigner(db)
- require.NoError(t, err)
- defer testhelper.MustClose(t, pa)
+ for _, tc := range []struct {
+ desc string
+ withAlternate bool
+ }{
+ {
+ desc: "without alternate",
+ },
+ {
+ desc: "with alternate",
+ withAlternate: true,
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
- // Access 10 repositories concurrently.
- repositoryCount := 10
- // Access each repository from 10 goroutines concurrently.
- goroutineCount := 10
+ db, err := OpenDatabase(testhelper.SharedLogger(t), t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, db)
- collectedIDs := make([][]partitionID, repositoryCount)
- ctx := testhelper.Context(t)
- wg := sync.WaitGroup{}
- start := make(chan struct{})
- for i := 0; i < repositoryCount; i++ {
- i := i
- collectedIDs[i] = make([]partitionID, goroutineCount)
- relativePath := fmt.Sprintf("relative-path-%d", i)
- for j := 0; j < goroutineCount; j++ {
- j := j
- wg.Add(1)
- go func() {
- defer wg.Done()
- <-start
- ptnID, err := pa.getPartitionID(ctx, relativePath)
- assert.NoError(t, err)
- collectedIDs[i][j] = ptnID
- }()
- }
- }
+ cfg := testcfg.Build(t)
- close(start)
- wg.Wait()
+ pa, err := newPartitionAssigner(db, cfg.Storages[0].Path)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pa)
- var partitionIDs []partitionID
- for _, ids := range collectedIDs {
- partitionIDs = append(partitionIDs, ids[0])
- for i := range ids {
- // We expect all goroutines accessing a given repository to get the
- // same partition ID for it.
- require.Equal(t, ids[0], ids[i], ids)
- }
- }
+ // Access 10 repositories concurrently.
+ repositoryCount := 10
+ // Access each repository from 10 goroutines concurrently.
+ goroutineCount := 10
+
+ collectedIDs := make([][]partitionID, repositoryCount)
+ ctx := testhelper.Context(t)
+ wg := sync.WaitGroup{}
+ start := make(chan struct{})
+
+ pool, poolPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ for i := 0; i < repositoryCount; i++ {
+ i := i
+ collectedIDs[i] = make([]partitionID, goroutineCount)
+
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{
+ SkipCreationViaService: true,
+ })
+
+ if tc.withAlternate {
+ // Link the repositories to the pool.
+ alternateRelativePath, err := filepath.Rel(
+ filepath.Join(repoPath, "objects"),
+ filepath.Join(poolPath, "objects"),
+ )
+ require.NoError(t, err)
+ require.NoError(t, os.WriteFile(filepath.Join(repoPath, "objects", "info", "alternates"), []byte(alternateRelativePath), fs.ModePerm))
- // We expect to have 10 unique partition IDs as there are 10 repositories being accessed.
- require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-start
+ _, err := pa.getPartitionID(ctx, repo.RelativePath)
+ assert.NoError(t, err)
+ }()
+ }
+
+ for j := 0; j < goroutineCount; j++ {
+ j := j
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-start
+ ptnID, err := pa.getPartitionID(ctx, repo.RelativePath)
+ assert.NoError(t, err)
+ collectedIDs[i][j] = ptnID
+ }()
+ }
+ }
+
+ close(start)
+ wg.Wait()
+
+ var partitionIDs []partitionID
+ for _, ids := range collectedIDs {
+ partitionIDs = append(partitionIDs, ids[0])
+ for i := range ids {
+ // We expect all goroutines accessing a given repository to get the
+ // same partition ID for it.
+ require.Equal(t, ids[0], ids[i], ids)
+ }
+ }
+
+ if tc.withAlternate {
+ // We expect all repositories to have been assigned to the same partition as they are all linked to the same pool.
+ require.Equal(t, []partitionID{1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, partitionIDs)
+ ptnID, err := pa.getPartitionID(ctx, pool.RelativePath)
+ require.NoError(t, err)
+ require.Equal(t, partitionID(1), ptnID, "pool should have been assigned into the same partition as the linked repositories")
+ return
+ }
+
+ // We expect to have 10 unique partition IDs as there are 10 repositories being accessed.
+ require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs)
+ })
+ }
}
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index e30e5cc23..9ff3e2f72 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -215,7 +215,7 @@ func NewPartitionManager(
return nil, fmt.Errorf("create storage's database directory: %w", err)
}
- pa, err := newPartitionAssigner(db)
+ pa, err := newPartitionAssigner(db, storage.Path)
if err != nil {
return nil, fmt.Errorf("new partition assigner: %w", err)
}