Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWill Chandler <wchandler@gitlab.com>2023-08-29 06:07:15 +0300
committerWill Chandler <wchandler@gitlab.com>2023-08-29 06:07:15 +0300
commitd8d7990c4d9240d5626d28efad4260d7bac7bd8e (patch)
treedc5f0eefbffa0f6302a48621e24bf813248dcfa5
parentfbf36b315fe09eaf304ef6a18ee78e27b9cc079f (diff)
parent6cf465ea9b6f1c2fe53e2693c6603a2c1a5c4924 (diff)
Merge branch 'smh-partition-assignments' into 'master'
Implement partition assignment logic See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6245 Merged-by: Will Chandler <wchandler@gitlab.com> Approved-by: Will Chandler <wchandler@gitlab.com> Reviewed-by: Will Chandler <wchandler@gitlab.com> Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com> Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r--cmd/gitaly-backup/restore_test.go14
-rw-r--r--internal/backup/backup_test.go7
-rw-r--r--internal/backup/server_side_test.go7
-rw-r--r--internal/gitaly/config/locator_test.go26
-rw-r--r--internal/gitaly/service/repository/remove_all_test.go7
-rw-r--r--internal/gitaly/service/repository/repository_exists_test.go14
-rw-r--r--internal/gitaly/service/server/disk_stats_test.go28
-rw-r--r--internal/gitaly/service/server/info_test.go26
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner.go199
-rw-r--r--internal/gitaly/storage/storagemgr/partition_assigner_test.go119
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go58
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager_test.go120
-rw-r--r--internal/praefect/remove_all_test.go7
13 files changed, 534 insertions, 98 deletions
diff --git a/cmd/gitaly-backup/restore_test.go b/cmd/gitaly-backup/restore_test.go
index 7b327f3eb..a36434b55 100644
--- a/cmd/gitaly-backup/restore_test.go
+++ b/cmd/gitaly-backup/restore_test.go
@@ -25,6 +25,13 @@ import (
func TestRestoreSubcommand(t *testing.T) {
gittest.SkipWithSHA256(t)
+ testhelper.SkipWithWAL(t, `
+RemoveAll is removing the entire content of the storage. This would also remove the database's and
+the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
+the database and only then perform the removal.
+
+Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
+
t.Parallel()
ctx := testhelper.Context(t)
@@ -101,6 +108,13 @@ func TestRestoreSubcommand(t *testing.T) {
func TestRestoreSubcommand_serverSide(t *testing.T) {
gittest.SkipWithSHA256(t)
+ testhelper.SkipWithWAL(t, `
+RemoveAll is removing the entire content of the storage. This would also remove the database's and
+the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
+the database and only then perform the removal.
+
+Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
+
t.Parallel()
ctx := testhelper.Context(t)
diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go
index 0121f2116..eb4fa2011 100644
--- a/internal/backup/backup_test.go
+++ b/internal/backup/backup_test.go
@@ -29,6 +29,13 @@ import (
)
func TestManager_RemoveAllRepositories(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+RemoveAll is removing the entire content of the storage. This would also remove the database's and
+the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
+the database and only then perform the removal.
+
+Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
+
t.Parallel()
cfg := testcfg.Build(t)
diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go
index 05f5f9b82..5d69b70b2 100644
--- a/internal/backup/server_side_test.go
+++ b/internal/backup/server_side_test.go
@@ -242,6 +242,13 @@ func TestServerSideAdapter_Restore(t *testing.T) {
}
func TestServerSideAdapter_RemoveAllRepositories(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+RemoveAll is removing the entire content of the storage. This would also remove the database's and
+the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
+the database and only then perform the removal.
+
+Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
+
t.Parallel()
backupRoot := testhelper.TempDir(t)
diff --git a/internal/gitaly/config/locator_test.go b/internal/gitaly/config/locator_test.go
index dc78fe493..fb6761eb9 100644
--- a/internal/gitaly/config/locator_test.go
+++ b/internal/gitaly/config/locator_test.go
@@ -34,19 +34,22 @@ func TestConfigLocator_GetRepoPath(t *testing.T) {
repo.RelativePath = strings.TrimPrefix(repoPath, cfg.Storages[0].Path)
}
- // The storage name still present in the storages list, but not on the disk.
- require.NoError(t, os.RemoveAll(cfg.Storages[1].Path))
+ if !testhelper.IsWALEnabled() {
+ // The storage name still present in the storages list, but not on the disk.
+ require.NoError(t, os.RemoveAll(cfg.Storages[1].Path))
+ }
// The repository path exists on the disk, but it is not a git repository.
const notRepositoryFolder = "not-a-git-repo"
require.NoError(t, os.MkdirAll(filepath.Join(cfg.Storages[0].Path, notRepositoryFolder), perm.SharedDir))
for _, tc := range []struct {
- desc string
- repo *gitalypb.Repository
- opts []storage.GetRepoPathOption
- expPath string
- expErr error
+ desc string
+ repo *gitalypb.Repository
+ opts []storage.GetRepoPathOption
+ expPath string
+ expErr error
+ skipWithWAL string
}{
{
desc: "storage is empty",
@@ -62,6 +65,11 @@ func TestConfigLocator_GetRepoPath(t *testing.T) {
desc: "storage doesn't exist on disk",
repo: &gitalypb.Repository{StorageName: cfg.Storages[1].Name, RelativePath: repo.RelativePath},
expErr: structerr.NewNotFound("storage does not exist").WithMetadata("storage_path", cfg.Storages[1].Path),
+ skipWithWAL: `
+The test is testing a broken storage by deleting the storage after initializing it.
+This causes problems with WAL as the disk state expected to be present by the database
+and the transaction manager suddenly don't exist. Skip the test here with WAL and rely
+on the storage implementation to handle broken storage on initialization.`,
},
{
desc: "relative path is empty",
@@ -103,6 +111,10 @@ func TestConfigLocator_GetRepoPath(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
+ if tc.skipWithWAL != "" {
+ testhelper.SkipWithWAL(t, tc.skipWithWAL)
+ }
+
path, err := locator.GetRepoPath(tc.repo, tc.opts...)
require.Equal(t, tc.expPath, path)
require.Equal(t, tc.expErr, err)
diff --git a/internal/gitaly/service/repository/remove_all_test.go b/internal/gitaly/service/repository/remove_all_test.go
index 1a861bc84..785f54b01 100644
--- a/internal/gitaly/service/repository/remove_all_test.go
+++ b/internal/gitaly/service/repository/remove_all_test.go
@@ -10,6 +10,13 @@ import (
)
func TestRemoveAll(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+RemoveAll is removing the entire content of the storage. This would also remove the database's and
+the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
+the database and only then perform the removal.
+
+Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
+
t.Parallel()
cfg, client := setupRepositoryService(t)
diff --git a/internal/gitaly/service/repository/repository_exists_test.go b/internal/gitaly/service/repository/repository_exists_test.go
index 41470617f..61336701f 100644
--- a/internal/gitaly/service/repository/repository_exists_test.go
+++ b/internal/gitaly/service/repository/repository_exists_test.go
@@ -23,7 +23,9 @@ func TestRepositoryExists(t *testing.T) {
client, socketPath := runRepositoryService(t, cfg)
cfg.SocketPath = socketPath
- require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid")
+ if !testhelper.IsWALEnabled() {
+ require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid")
+ }
repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{})
@@ -32,6 +34,7 @@ func TestRepositoryExists(t *testing.T) {
request *gitalypb.RepositoryExistsRequest
expectedErr error
exists bool
+ skipWithWAL string
}{
{
desc: "repository nil",
@@ -117,11 +120,20 @@ func TestRepositoryExists(t *testing.T) {
"storage_path", cfg.Storages[2].Path,
)
}(),
+ skipWithWAL: `
+The test is testing a broken storage by deleting the storage after initializing it.
+This causes problems with WAL as the disk state expected to be present by the database
+and the transaction manager suddenly don't exist. Skip the test here with WAL and rely
+on the storage implementation to handle broken storage on initialization.`,
},
}
for _, tc := range queries {
t.Run(tc.desc, func(t *testing.T) {
+ if tc.skipWithWAL != "" {
+ testhelper.SkipWithWAL(t, tc.skipWithWAL)
+ }
+
response, err := client.RepositoryExists(ctx, tc.request)
testhelper.RequireGrpcError(t, tc.expectedErr, err)
if err != nil {
diff --git a/internal/gitaly/service/server/disk_stats_test.go b/internal/gitaly/service/server/disk_stats_test.go
index a6f5bf083..9814cf412 100644
--- a/internal/gitaly/service/server/disk_stats_test.go
+++ b/internal/gitaly/service/server/disk_stats_test.go
@@ -13,10 +13,22 @@ import (
)
func TestStorageDiskStatistics(t *testing.T) {
- cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken"))
+ storageOpt := testcfg.WithStorages("default")
+ if !testhelper.IsWALEnabled() {
+ // The test is testing a broken storage by deleting the storage after initializing it.
+ // This causes problems with WAL as the disk state expected to be present by the database
+ // and the transaction manager suddenly don't exist. Skip the test here with WAL and rely
+ // on the storage implementation to handle broken storage on initialization.
+ storageOpt = testcfg.WithStorages("default", "broken")
+ }
+
+ cfg := testcfg.Build(t, storageOpt)
addr := runServer(t, cfg)
- require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid")
+
+ if !testhelper.IsWALEnabled() {
+ require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid")
+ }
client := newServerClient(t, addr)
ctx := testhelper.Context(t)
@@ -25,7 +37,7 @@ func TestStorageDiskStatistics(t *testing.T) {
require.NoError(t, err)
expectedStorages := len(cfg.Storages)
- if testhelper.IsPraefectEnabled() {
+ if testhelper.IsPraefectEnabled() && !testhelper.IsWALEnabled() {
// Praefect does not virtualize StorageDiskStatistics correctly. It proxies the call to each Gitaly
// and returns the results of all of their storages. However, not all storages on a Gitaly node are
// necessarily part of a virtual storage. Likewise, Praefect should not expose the individual storages
@@ -47,11 +59,13 @@ func TestStorageDiskStatistics(t *testing.T) {
approxEqual(t, c.GetStorageStatuses()[0].Used, used)
require.Equal(t, cfg.Storages[0].Name, c.GetStorageStatuses()[0].StorageName)
- require.Equal(t, int64(0), c.GetStorageStatuses()[1].Available)
- require.Equal(t, int64(0), c.GetStorageStatuses()[1].Used)
- require.Equal(t, cfg.Storages[1].Name, c.GetStorageStatuses()[1].StorageName)
+ if !testhelper.IsWALEnabled() {
+ require.Equal(t, int64(0), c.GetStorageStatuses()[1].Available)
+ require.Equal(t, int64(0), c.GetStorageStatuses()[1].Used)
+ require.Equal(t, cfg.Storages[1].Name, c.GetStorageStatuses()[1].StorageName)
+ }
- if testhelper.IsPraefectEnabled() {
+ if testhelper.IsPraefectEnabled() && !testhelper.IsWALEnabled() {
// This is incorrect behavior caused by the bug explained above.
approxEqual(t, c.GetStorageStatuses()[2].Available, avail)
approxEqual(t, c.GetStorageStatuses()[2].Used, used)
diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go
index b13666b38..6d3179ebc 100644
--- a/internal/gitaly/service/server/info_test.go
+++ b/internal/gitaly/service/server/info_test.go
@@ -22,10 +22,22 @@ import (
)
func TestGitalyServerInfo(t *testing.T) {
- cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken"))
+ storageOpt := testcfg.WithStorages("default")
+ if !testhelper.IsWALEnabled() {
+ // The test is testing a broken storage by deleting the storage after initializing it.
+ // This causes problems with WAL as the disk state expected to be present by the database
+ // and the transaction manager suddenly don't exist. Skip the test here with WAL and rely
+ // on the storage implementation to handle broken storage on initialization.
+ storageOpt = testcfg.WithStorages("default", "broken")
+ }
+
+ cfg := testcfg.Build(t, storageOpt)
addr := runServer(t, cfg, testserver.WithDisablePraefect())
- require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid")
+
+ if !testhelper.IsWALEnabled() {
+ require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid")
+ }
client := newServerClient(t, addr)
ctx := testhelper.Context(t)
@@ -49,10 +61,12 @@ func TestGitalyServerInfo(t *testing.T) {
require.NotEmpty(t, c.GetStorageStatuses()[0].FsType)
require.Equal(t, uint32(1), c.GetStorageStatuses()[0].ReplicationFactor)
- require.False(t, c.GetStorageStatuses()[1].Readable)
- require.False(t, c.GetStorageStatuses()[1].Writeable)
- require.Equal(t, metadata.GitalyFilesystemID, c.GetStorageStatuses()[0].FilesystemId)
- require.Equal(t, uint32(1), c.GetStorageStatuses()[1].ReplicationFactor)
+ if !testhelper.IsWALEnabled() {
+ require.False(t, c.GetStorageStatuses()[1].Readable)
+ require.False(t, c.GetStorageStatuses()[1].Writeable)
+ require.Equal(t, metadata.GitalyFilesystemID, c.GetStorageStatuses()[0].FilesystemId)
+ require.Equal(t, uint32(1), c.GetStorageStatuses()[1].ReplicationFactor)
+ }
}
func runServer(t *testing.T, cfg config.Cfg, opts ...testserver.GitalyServerOpt) string {
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner.go b/internal/gitaly/storage/storagemgr/partition_assigner.go
new file mode 100644
index 000000000..ec8ef9cdd
--- /dev/null
+++ b/internal/gitaly/storage/storagemgr/partition_assigner.go
@@ -0,0 +1,199 @@
+package storagemgr
+
+import (
+ "context"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "strconv"
+ "sync"
+
+ "github.com/dgraph-io/badger/v4"
+)
+
+// errPartitionAssignmentNotFound is returned when attempting to access a
+// partition assignment in the database that doesn't yet exist.
+var errPartitionAssignmentNotFound = errors.New("partition assignment not found")
+
+// partitionID uniquely identifies a partition.
+type partitionID uint64
+
+func (id partitionID) MarshalBinary() []byte {
+ marshaled := make([]byte, binary.Size(id))
+ binary.BigEndian.PutUint64(marshaled, uint64(id))
+ return marshaled
+}
+
+func (id *partitionID) UnmarshalBinary(data []byte) {
+ *id = partitionID(binary.BigEndian.Uint64(data))
+}
+
+func (id partitionID) String() string {
+ return strconv.FormatUint(uint64(id), 10)
+}
+
+// partitionAssignmentTable records which partitions repositories are assigned into.
+type partitionAssignmentTable struct{ db *badger.DB }
+
+func newPartitionAssignmentTable(db *badger.DB) *partitionAssignmentTable {
+ return &partitionAssignmentTable{db: db}
+}
+
+func (pt *partitionAssignmentTable) key(relativePath string) []byte {
+ return []byte(fmt.Sprintf("partition_assignment/%s", relativePath))
+}
+
+func (pt *partitionAssignmentTable) getPartitionID(relativePath string) (partitionID, error) {
+ var id partitionID
+ if err := pt.db.View(func(txn *badger.Txn) error {
+ item, err := txn.Get(pt.key(relativePath))
+ if err != nil {
+ if errors.Is(err, badger.ErrKeyNotFound) {
+ return errPartitionAssignmentNotFound
+ }
+
+ return fmt.Errorf("get: %w", err)
+ }
+
+ return item.Value(func(value []byte) error {
+ id.UnmarshalBinary(value)
+ return nil
+ })
+ }); err != nil {
+ return 0, fmt.Errorf("view: %w", err)
+ }
+
+ return id, nil
+}
+
+func (pt *partitionAssignmentTable) setPartitionID(relativePath string, id partitionID) error {
+ wb := pt.db.NewWriteBatch()
+ if err := wb.Set(pt.key(relativePath), id.MarshalBinary()); err != nil {
+ return fmt.Errorf("set: %w", err)
+ }
+
+ if err := wb.Flush(); err != nil {
+ return fmt.Errorf("flush: %w", err)
+ }
+
+ return nil
+}
+
+// partitionAssigner manages assignment of repositories in to partitions.
+type partitionAssigner struct {
+ // mutex synchronizes access to repositoryLocks.
+ mutex sync.Mutex
+ // repositoryLocks holds per-repository locks. The key is a relative path and the
+ // channel closing signals the lock being released.
+ repositoryLocks map[string]chan struct{}
+ // idSequence is the sequence used to mint partition IDs.
+ idSequence *badger.Sequence
+ // partitionAssignmentTable contains the partition assignment records.
+ partitionAssignmentTable *partitionAssignmentTable
+}
+
+// newPartitionAssigner returns a new partitionAssigner. Close must be called on the
+// returned instance to release acquired resources.
+func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) {
+ seq, err := db.GetSequence([]byte("partition_id_seq"), 100)
+ if err != nil {
+ return nil, fmt.Errorf("get sequence: %w", err)
+ }
+
+ return &partitionAssigner{
+ repositoryLocks: make(map[string]chan struct{}),
+ idSequence: seq,
+ partitionAssignmentTable: newPartitionAssignmentTable(db),
+ }, nil
+}
+
+func (pa *partitionAssigner) Close() error {
+ return pa.idSequence.Release()
+}
+
+func (pa *partitionAssigner) allocatePartitionID() (partitionID, error) {
+ // Start partition IDs from 1 so the default value refers to an invalid
+ // partition.
+ var id uint64
+ for id == 0 {
+ var err error
+ id, err = pa.idSequence.Next()
+ if err != nil {
+ return 0, fmt.Errorf("next: %w", err)
+ }
+ }
+
+ return partitionID(id), nil
+}
+
+// getPartititionID returns the partition ID of the repository. If the repository wasn't yet assigned into
+// a partition, it will be assigned into one and the assignment stored. Further accesses return the stored
+// partition ID. Each repository goes into its own partition. The method is safe to call concurrently.
+func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath string) (partitionID, error) {
+ ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath)
+ if err != nil {
+ if !errors.Is(err, errPartitionAssignmentNotFound) {
+ return 0, fmt.Errorf("get partition: %w", err)
+ }
+
+ // Repository wasn't yet assigned into a partition.
+
+ pa.mutex.Lock()
+ // See if some other goroutine already locked the repository. If so, wait for it to complete
+ // and get the partition ID it set.
+ if lock, ok := pa.repositoryLocks[relativePath]; ok {
+ pa.mutex.Unlock()
+ // Some other goroutine is already assigning a partition for the
+ // repository. Wait for it to complete and then get the partition.
+ select {
+ case <-lock:
+ ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath)
+ if err != nil {
+ return 0, fmt.Errorf("get partition ID after waiting: %w", err)
+ }
+ return ptnID, nil
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ }
+ }
+
+ // No other goroutine had locked the repository yet. Lock the repository so other goroutines
+ // wait while we assign the repository a partition.
+ lock := make(chan struct{})
+ pa.repositoryLocks[relativePath] = lock
+ pa.mutex.Unlock()
+ defer func() {
+ close(lock)
+ pa.mutex.Lock()
+ delete(pa.repositoryLocks, relativePath)
+ pa.mutex.Unlock()
+ }()
+
+ // With the repository locked, check first whether someone else assigned it into a partition
+ // while we weren't holding the lock between the first failed attempt getting the assignment
+ // and locking the repository.
+ ptnID, err = pa.partitionAssignmentTable.getPartitionID(relativePath)
+ if !errors.Is(err, errPartitionAssignmentNotFound) {
+ if err != nil {
+ return 0, fmt.Errorf("recheck partition: %w", err)
+ }
+
+ // Some other goroutine assigned a partition between the failed attempt and locking the
+ // repository.
+ return ptnID, nil
+ }
+
+ // Each repository goes into its own partition. Allocate a new partition ID for this
+ // repository.
+ ptnID, err = pa.allocatePartitionID()
+ if err != nil {
+ return 0, fmt.Errorf("acquire partition id: %w", err)
+ }
+
+ if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil {
+ return 0, fmt.Errorf("set partition: %w", err)
+ }
+ }
+
+ return ptnID, nil
+}
diff --git a/internal/gitaly/storage/storagemgr/partition_assigner_test.go b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
new file mode 100644
index 000000000..09f975343
--- /dev/null
+++ b/internal/gitaly/storage/storagemgr/partition_assigner_test.go
@@ -0,0 +1,119 @@
+package storagemgr
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+)
+
+func TestPartitionAssigner(t *testing.T) {
+ db, err := OpenDatabase(testhelper.NewDiscardingLogger(t), t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, db)
+
+ pa, err := newPartitionAssigner(db)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pa)
+
+ ctx := testhelper.Context(t)
+
+ relativePath1 := "relative-path-1"
+ // The relative path should get assigned into partition.
+ ptnID1, err := pa.getPartitionID(ctx, relativePath1)
+ require.NoError(t, err)
+ require.EqualValues(t, 1, ptnID1, "first allocated partition id should be 1")
+
+ // The second repository should land into its own partition.
+ ptnID2, err := pa.getPartitionID(ctx, "relative-path-2")
+ require.NoError(t, err)
+ require.EqualValues(t, 2, ptnID2)
+
+ // Retrieving the first repository's partition should return the partition that
+ // was assigned earlier.
+ ptnID1, err = pa.getPartitionID(ctx, relativePath1)
+ require.NoError(t, err)
+ require.EqualValues(t, 1, ptnID1)
+}
+
+func TestPartitionAssigner_close(t *testing.T) {
+ dbDir := t.TempDir()
+
+ db, err := OpenDatabase(testhelper.NewDiscardingLogger(t), dbDir)
+ require.NoError(t, err)
+
+ pa, err := newPartitionAssigner(db)
+ require.NoError(t, err)
+ testhelper.MustClose(t, pa)
+ testhelper.MustClose(t, db)
+
+ db, err = OpenDatabase(testhelper.NewDiscardingLogger(t), dbDir)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, db)
+
+ pa, err = newPartitionAssigner(db)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pa)
+
+ // A block of ID is loaded into memory when the partitionAssigner is initialized.
+ // Closing the partitionAssigner is expected to return the unused IDs in the block
+ // back to the database.
+ ptnID, err := pa.getPartitionID(testhelper.Context(t), "relative-path")
+ require.NoError(t, err)
+ require.EqualValues(t, 1, ptnID)
+}
+
+func TestPartitionAssigner_concurrentAccess(t *testing.T) {
+ db, err := OpenDatabase(testhelper.NewDiscardingLogger(t), t.TempDir())
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, db)
+
+ pa, err := newPartitionAssigner(db)
+ require.NoError(t, err)
+ defer testhelper.MustClose(t, pa)
+
+ // Access 10 repositories concurrently.
+ repositoryCount := 10
+ // Access each repository from 10 goroutines concurrently.
+ goroutineCount := 10
+
+ collectedIDs := make([][]partitionID, repositoryCount)
+ ctx := testhelper.Context(t)
+ wg := sync.WaitGroup{}
+ start := make(chan struct{})
+ for i := 0; i < repositoryCount; i++ {
+ i := i
+ collectedIDs[i] = make([]partitionID, goroutineCount)
+ relativePath := fmt.Sprintf("relative-path-%d", i)
+ for j := 0; j < goroutineCount; j++ {
+ j := j
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ <-start
+ ptnID, err := pa.getPartitionID(ctx, relativePath)
+ assert.NoError(t, err)
+ collectedIDs[i][j] = ptnID
+ }()
+ }
+ }
+
+ close(start)
+ wg.Wait()
+
+ var partitionIDs []partitionID
+ for _, ids := range collectedIDs {
+ partitionIDs = append(partitionIDs, ids[0])
+ for i := range ids {
+ // We expect all goroutines accessing a given repository to get the
+ // same partition ID for it.
+ require.Equal(t, ids[0], ids[i], ids)
+ }
+ }
+
+ // We expect to have 10 unique partition IDs as there are 10 repositories being accessed.
+ require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs)
+}
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index e39ba96b3..b44a72621 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -9,7 +9,6 @@ import (
"io/fs"
"os"
"path/filepath"
- "strings"
"sync"
"github.com/dgraph-io/badger/v4"
@@ -55,8 +54,10 @@ type storageManager struct {
closed bool
// db is the handle to the key-value store used for storing the storage's database state.
database *badger.DB
+ // partitionAssigner manages partition assignments of repositories.
+ partitionAssigner *partitionAssigner
// partitions contains all the active partitions. Each repository can have up to one partition.
- partitions map[string]*partition
+ partitions map[partitionID]*partition
// activePartitions keeps track of active partitions.
activePartitions sync.WaitGroup
}
@@ -75,6 +76,10 @@ func (sm *storageManager) close() {
// Wait for all partitions to finish.
sm.activePartitions.Wait()
+ if err := sm.partitionAssigner.Close(); err != nil {
+ sm.logger.WithError(err).Error("failed closing partition assigner")
+ }
+
if err := sm.database.Close(); err != nil {
sm.logger.WithError(err).Error("failed closing storage's database")
}
@@ -200,13 +205,19 @@ func NewPartitionManager(
return nil, fmt.Errorf("create storage's database directory: %w", err)
}
+ pa, err := newPartitionAssigner(db)
+ if err != nil {
+ return nil, fmt.Errorf("new partition assigner: %w", err)
+ }
+
storages[storage.Name] = &storageManager{
- logger: storageLogger,
- path: storage.Path,
- repoFactory: repoFactory,
- stagingDirectory: stagingDir,
- database: db,
- partitions: map[string]*partition{},
+ logger: storageLogger,
+ path: storage.Path,
+ repoFactory: repoFactory,
+ stagingDirectory: stagingDir,
+ database: db,
+ partitionAssigner: pa,
+ partitions: map[partitionID]*partition{},
}
}
@@ -231,7 +242,18 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
return nil, structerr.NewInvalidArgument("validate relative path: %w", err)
}
- relativeStateDir := deriveStateDirectory(relativePath)
+ partitionID, err := storageMgr.partitionAssigner.getPartitionID(ctx, relativePath)
+ if err != nil {
+ if errors.Is(err, badger.ErrDBClosed) {
+ // The database is closed when PartitionManager is closing. Return a more
+ // descriptive error of what happened.
+ return nil, ErrPartitionManagerClosed
+ }
+
+ return nil, fmt.Errorf("get partition: %w", err)
+ }
+
+ relativeStateDir := deriveStateDirectory(partitionID)
absoluteStateDir := filepath.Join(storageMgr.path, relativeStateDir)
if err := os.MkdirAll(filepath.Dir(absoluteStateDir), perm.PrivateDir); err != nil {
return nil, fmt.Errorf("create state directory hierarchy: %w", err)
@@ -248,7 +270,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
return nil, ErrPartitionManagerClosed
}
- ptn, ok := storageMgr.partitions[relativePath]
+ ptn, ok := storageMgr.partitions[partitionID]
if !ok {
ptn = &partition{
closing: make(chan struct{}),
@@ -265,7 +287,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
ptn.transactionManager = mgr
- storageMgr.partitions[relativePath] = ptn
+ storageMgr.partitions[partitionID] = ptn
storageMgr.activePartitions.Add(1)
go func() {
@@ -280,7 +302,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
// deleted allowing the next transaction for the repository to create a new partition
// and TransactionManager.
storageMgr.mu.Lock()
- delete(storageMgr.partitions, relativePath)
+ delete(storageMgr.partitions, partitionID)
storageMgr.mu.Unlock()
close(ptn.closed)
@@ -334,11 +356,11 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository,
}
}
-// deriveStateDirectory hashes the relative path and returns the state directory where a state
-// related to a given partition should be stored.
-func deriveStateDirectory(relativePath string) string {
+// deriveStateDirectory hashes the partition ID and returns the state
+// directory where state related to the partition should be stored.
+func deriveStateDirectory(id partitionID) string {
hasher := sha256.New()
- hasher.Write([]byte(relativePath))
+ hasher.Write([]byte(id.String()))
hash := hex.EncodeToString(hasher.Sum(nil))
return filepath.Join(
@@ -347,9 +369,7 @@ func deriveStateDirectory(relativePath string) string {
// subdirectories to keep the directory sizes reasonable.
hash[0:2],
hash[2:4],
- // Flatten the relative path by removing the path separators so the
- // repository is stored on this level in the directory hierarchy.
- strings.ReplaceAll(relativePath, string(os.PathSeparator), ""),
+ id.String(),
)
}
diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go
index ddb960fa5..eac914eee 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go
@@ -44,7 +44,7 @@ func TestPartitionManager(t *testing.T) {
repo storage.Repository
// expectedState contains the partitions by their storages and their pending transaction count at
// the end of the step.
- expectedState map[string]map[string]uint
+ expectedState map[string]map[partitionID]uint
// expectedError is the error expected to be returned when beginning the transaction.
expectedError error
}
@@ -57,7 +57,7 @@ func TestPartitionManager(t *testing.T) {
ctx context.Context
// expectedState contains the partitions by their storages and their pending transaction count at
// the end of the step.
- expectedState map[string]map[string]uint
+ expectedState map[string]map[partitionID]uint
// expectedError is the error that is expected to be returned when committing the transaction.
expectedError error
}
@@ -68,7 +68,7 @@ func TestPartitionManager(t *testing.T) {
transactionID int
// expectedState contains the partitions by their storages and their pending transaction count at
// the end of the step.
- expectedState map[string]map[string]uint
+ expectedState map[string]map[partitionID]uint
// expectedError is the error that is expected to be returned when rolling back the transaction.
expectedError error
}
@@ -117,22 +117,22 @@ func TestPartitionManager(t *testing.T) {
// checkExpectedState validates that the partition manager contains the correct partitions and
// associated transaction count at the point of execution.
- checkExpectedState := func(t *testing.T, cfg config.Cfg, partitionManager *PartitionManager, expectedState map[string]map[string]uint) {
+ checkExpectedState := func(t *testing.T, cfg config.Cfg, partitionManager *PartitionManager, expectedState map[string]map[partitionID]uint) {
t.Helper()
- actualState := map[string]map[string]uint{}
+ actualState := map[string]map[partitionID]uint{}
for storageName, storageMgr := range partitionManager.storages {
- for partitionKey, partition := range storageMgr.partitions {
+ for ptnID, partition := range storageMgr.partitions {
if actualState[storageName] == nil {
- actualState[storageName] = map[string]uint{}
+ actualState[storageName] = map[partitionID]uint{}
}
- actualState[storageName][partitionKey] = partition.pendingTransactionCount
+ actualState[storageName][ptnID] = partition.pendingTransactionCount
}
}
if expectedState == nil {
- expectedState = map[string]map[string]uint{}
+ expectedState = map[string]map[partitionID]uint{}
}
require.Equal(t, expectedState, actualState)
@@ -173,9 +173,9 @@ func TestPartitionManager(t *testing.T) {
steps: steps{
begin{
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -194,9 +194,9 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 1,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -206,9 +206,9 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 2,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -229,26 +229,26 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 1,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
begin{
transactionID: 2,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 2,
+ 1: 2,
},
},
},
commit{
transactionID: 1,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -271,51 +271,51 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 1,
repo: repoA,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repoA.GetRelativePath(): 1,
+ 1: 1,
},
},
},
begin{
transactionID: 2,
repo: repoB,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repoA.GetRelativePath(): 1,
- repoB.GetRelativePath(): 1,
+ 1: 1,
+ 2: 1,
},
},
},
begin{
transactionID: 3,
repo: repoC,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repoA.GetRelativePath(): 1,
- repoB.GetRelativePath(): 1,
+ 1: 1,
+ 2: 1,
},
"other-storage": {
- repoC.GetRelativePath(): 1,
+ 1: 1,
},
},
},
commit{
transactionID: 1,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repoB.GetRelativePath(): 1,
+ 2: 1,
},
"other-storage": {
- repoC.GetRelativePath(): 1,
+ 1: 1,
},
},
},
commit{
transactionID: 2,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"other-storage": {
- repoC.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -335,9 +335,9 @@ func TestPartitionManager(t *testing.T) {
steps: steps{
begin{
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -377,9 +377,9 @@ func TestPartitionManager(t *testing.T) {
steps: steps{
begin{
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -400,9 +400,9 @@ func TestPartitionManager(t *testing.T) {
steps: steps{
begin{
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -424,9 +424,9 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 1,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -436,9 +436,9 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 2,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -516,9 +516,9 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 1,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -528,9 +528,9 @@ func TestPartitionManager(t *testing.T) {
StorageName: repo.GetStorageName(),
RelativePath: filepath.Join(repo.GetRelativePath(), "child-dir", ".."),
},
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 2,
+ 1: 2,
},
},
},
@@ -548,9 +548,9 @@ func TestPartitionManager(t *testing.T) {
begin{
transactionID: 1,
repo: repo,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
@@ -560,25 +560,25 @@ func TestPartitionManager(t *testing.T) {
StorageName: repo.GetStorageName(),
RelativePath: repo.GetRelativePath(),
},
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 2,
+ 1: 2,
},
},
},
rollback{
transactionID: 2,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
},
rollback{
transactionID: 2,
- expectedState: map[string]map[string]uint{
+ expectedState: map[string]map[partitionID]uint{
"default": {
- repo.GetRelativePath(): 1,
+ 1: 1,
},
},
expectedError: ErrTransactionAlreadyRollbacked,
@@ -663,7 +663,11 @@ func TestPartitionManager(t *testing.T) {
storageMgr := partitionManager.storages[step.repo.GetStorageName()]
storageMgr.mu.Lock()
- ptn := storageMgr.partitions[step.repo.GetRelativePath()]
+
+ ptnID, err := storageMgr.partitionAssigner.getPartitionID(ctx, step.repo.GetRelativePath())
+ require.NoError(t, err)
+
+ ptn := storageMgr.partitions[ptnID]
storageMgr.mu.Unlock()
openTransactionData[step.transactionID] = &transactionData{
diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go
index cfbd7beb5..5f635de0b 100644
--- a/internal/praefect/remove_all_test.go
+++ b/internal/praefect/remove_all_test.go
@@ -24,6 +24,13 @@ import (
)
func TestRemoveAllHandler(t *testing.T) {
+ testhelper.SkipWithWAL(t, `
+RemoveAll is removing the entire content of the storage. This would also remove the database's and
+the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and
+the database and only then perform the removal.
+
+Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`)
+
t.Parallel()
ctx := testhelper.Context(t)