diff options
author | Will Chandler <wchandler@gitlab.com> | 2023-08-29 06:07:15 +0300 |
---|---|---|
committer | Will Chandler <wchandler@gitlab.com> | 2023-08-29 06:07:15 +0300 |
commit | d8d7990c4d9240d5626d28efad4260d7bac7bd8e (patch) | |
tree | dc5f0eefbffa0f6302a48621e24bf813248dcfa5 | |
parent | fbf36b315fe09eaf304ef6a18ee78e27b9cc079f (diff) | |
parent | 6cf465ea9b6f1c2fe53e2693c6603a2c1a5c4924 (diff) |
Merge branch 'smh-partition-assignments' into 'master'
Implement partition assignment logic
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6245
Merged-by: Will Chandler <wchandler@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Reviewed-by: Will Chandler <wchandler@gitlab.com>
Reviewed-by: Sami Hiltunen <shiltunen@gitlab.com>
Co-authored-by: Sami Hiltunen <shiltunen@gitlab.com>
-rw-r--r-- | cmd/gitaly-backup/restore_test.go | 14 | ||||
-rw-r--r-- | internal/backup/backup_test.go | 7 | ||||
-rw-r--r-- | internal/backup/server_side_test.go | 7 | ||||
-rw-r--r-- | internal/gitaly/config/locator_test.go | 26 | ||||
-rw-r--r-- | internal/gitaly/service/repository/remove_all_test.go | 7 | ||||
-rw-r--r-- | internal/gitaly/service/repository/repository_exists_test.go | 14 | ||||
-rw-r--r-- | internal/gitaly/service/server/disk_stats_test.go | 28 | ||||
-rw-r--r-- | internal/gitaly/service/server/info_test.go | 26 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/partition_assigner.go | 199 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/partition_assigner_test.go | 119 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/partition_manager.go | 58 | ||||
-rw-r--r-- | internal/gitaly/storage/storagemgr/partition_manager_test.go | 120 | ||||
-rw-r--r-- | internal/praefect/remove_all_test.go | 7 |
13 files changed, 534 insertions, 98 deletions
diff --git a/cmd/gitaly-backup/restore_test.go b/cmd/gitaly-backup/restore_test.go index 7b327f3eb..a36434b55 100644 --- a/cmd/gitaly-backup/restore_test.go +++ b/cmd/gitaly-backup/restore_test.go @@ -25,6 +25,13 @@ import ( func TestRestoreSubcommand(t *testing.T) { gittest.SkipWithSHA256(t) + testhelper.SkipWithWAL(t, ` +RemoveAll is removing the entire content of the storage. This would also remove the database's and +the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and +the database and only then perform the removal. + +Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) + t.Parallel() ctx := testhelper.Context(t) @@ -101,6 +108,13 @@ func TestRestoreSubcommand(t *testing.T) { func TestRestoreSubcommand_serverSide(t *testing.T) { gittest.SkipWithSHA256(t) + testhelper.SkipWithWAL(t, ` +RemoveAll is removing the entire content of the storage. This would also remove the database's and +the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and +the database and only then perform the removal. + +Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) + t.Parallel() ctx := testhelper.Context(t) diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index 0121f2116..eb4fa2011 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -29,6 +29,13 @@ import ( ) func TestManager_RemoveAllRepositories(t *testing.T) { + testhelper.SkipWithWAL(t, ` +RemoveAll is removing the entire content of the storage. This would also remove the database's and +the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and +the database and only then perform the removal. + +Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) + t.Parallel() cfg := testcfg.Build(t) diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index 05f5f9b82..5d69b70b2 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -242,6 +242,13 @@ func TestServerSideAdapter_Restore(t *testing.T) { } func TestServerSideAdapter_RemoveAllRepositories(t *testing.T) { + testhelper.SkipWithWAL(t, ` +RemoveAll is removing the entire content of the storage. This would also remove the database's and +the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and +the database and only then perform the removal. + +Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) + t.Parallel() backupRoot := testhelper.TempDir(t) diff --git a/internal/gitaly/config/locator_test.go b/internal/gitaly/config/locator_test.go index dc78fe493..fb6761eb9 100644 --- a/internal/gitaly/config/locator_test.go +++ b/internal/gitaly/config/locator_test.go @@ -34,19 +34,22 @@ func TestConfigLocator_GetRepoPath(t *testing.T) { repo.RelativePath = strings.TrimPrefix(repoPath, cfg.Storages[0].Path) } - // The storage name still present in the storages list, but not on the disk. - require.NoError(t, os.RemoveAll(cfg.Storages[1].Path)) + if !testhelper.IsWALEnabled() { + // The storage name still present in the storages list, but not on the disk. + require.NoError(t, os.RemoveAll(cfg.Storages[1].Path)) + } // The repository path exists on the disk, but it is not a git repository. const notRepositoryFolder = "not-a-git-repo" require.NoError(t, os.MkdirAll(filepath.Join(cfg.Storages[0].Path, notRepositoryFolder), perm.SharedDir)) for _, tc := range []struct { - desc string - repo *gitalypb.Repository - opts []storage.GetRepoPathOption - expPath string - expErr error + desc string + repo *gitalypb.Repository + opts []storage.GetRepoPathOption + expPath string + expErr error + skipWithWAL string }{ { desc: "storage is empty", @@ -62,6 +65,11 @@ func TestConfigLocator_GetRepoPath(t *testing.T) { desc: "storage doesn't exist on disk", repo: &gitalypb.Repository{StorageName: cfg.Storages[1].Name, RelativePath: repo.RelativePath}, expErr: structerr.NewNotFound("storage does not exist").WithMetadata("storage_path", cfg.Storages[1].Path), + skipWithWAL: ` +The test is testing a broken storage by deleting the storage after initializing it. +This causes problems with WAL as the disk state expected to be present by the database +and the transaction manager suddenly don't exist. Skip the test here with WAL and rely +on the storage implementation to handle broken storage on initialization.`, }, { desc: "relative path is empty", @@ -103,6 +111,10 @@ func TestConfigLocator_GetRepoPath(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { + if tc.skipWithWAL != "" { + testhelper.SkipWithWAL(t, tc.skipWithWAL) + } + path, err := locator.GetRepoPath(tc.repo, tc.opts...) require.Equal(t, tc.expPath, path) require.Equal(t, tc.expErr, err) diff --git a/internal/gitaly/service/repository/remove_all_test.go b/internal/gitaly/service/repository/remove_all_test.go index 1a861bc84..785f54b01 100644 --- a/internal/gitaly/service/repository/remove_all_test.go +++ b/internal/gitaly/service/repository/remove_all_test.go @@ -10,6 +10,13 @@ import ( ) func TestRemoveAll(t *testing.T) { + testhelper.SkipWithWAL(t, ` +RemoveAll is removing the entire content of the storage. This would also remove the database's and +the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and +the database and only then perform the removal. + +Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) + t.Parallel() cfg, client := setupRepositoryService(t) diff --git a/internal/gitaly/service/repository/repository_exists_test.go b/internal/gitaly/service/repository/repository_exists_test.go index 41470617f..61336701f 100644 --- a/internal/gitaly/service/repository/repository_exists_test.go +++ b/internal/gitaly/service/repository/repository_exists_test.go @@ -23,7 +23,9 @@ func TestRepositoryExists(t *testing.T) { client, socketPath := runRepositoryService(t, cfg) cfg.SocketPath = socketPath - require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid") + if !testhelper.IsWALEnabled() { + require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid") + } repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{}) @@ -32,6 +34,7 @@ func TestRepositoryExists(t *testing.T) { request *gitalypb.RepositoryExistsRequest expectedErr error exists bool + skipWithWAL string }{ { desc: "repository nil", @@ -117,11 +120,20 @@ func TestRepositoryExists(t *testing.T) { "storage_path", cfg.Storages[2].Path, ) }(), + skipWithWAL: ` +The test is testing a broken storage by deleting the storage after initializing it. +This causes problems with WAL as the disk state expected to be present by the database +and the transaction manager suddenly don't exist. Skip the test here with WAL and rely +on the storage implementation to handle broken storage on initialization.`, }, } for _, tc := range queries { t.Run(tc.desc, func(t *testing.T) { + if tc.skipWithWAL != "" { + testhelper.SkipWithWAL(t, tc.skipWithWAL) + } + response, err := client.RepositoryExists(ctx, tc.request) testhelper.RequireGrpcError(t, tc.expectedErr, err) if err != nil { diff --git a/internal/gitaly/service/server/disk_stats_test.go b/internal/gitaly/service/server/disk_stats_test.go index a6f5bf083..9814cf412 100644 --- a/internal/gitaly/service/server/disk_stats_test.go +++ b/internal/gitaly/service/server/disk_stats_test.go @@ -13,10 +13,22 @@ import ( ) func TestStorageDiskStatistics(t *testing.T) { - cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken")) + storageOpt := testcfg.WithStorages("default") + if !testhelper.IsWALEnabled() { + // The test is testing a broken storage by deleting the storage after initializing it. + // This causes problems with WAL as the disk state expected to be present by the database + // and the transaction manager suddenly don't exist. Skip the test here with WAL and rely + // on the storage implementation to handle broken storage on initialization. + storageOpt = testcfg.WithStorages("default", "broken") + } + + cfg := testcfg.Build(t, storageOpt) addr := runServer(t, cfg) - require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") + + if !testhelper.IsWALEnabled() { + require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") + } client := newServerClient(t, addr) ctx := testhelper.Context(t) @@ -25,7 +37,7 @@ func TestStorageDiskStatistics(t *testing.T) { require.NoError(t, err) expectedStorages := len(cfg.Storages) - if testhelper.IsPraefectEnabled() { + if testhelper.IsPraefectEnabled() && !testhelper.IsWALEnabled() { // Praefect does not virtualize StorageDiskStatistics correctly. It proxies the call to each Gitaly // and returns the results of all of their storages. However, not all storages on a Gitaly node are // necessarily part of a virtual storage. Likewise, Praefect should not expose the individual storages @@ -47,11 +59,13 @@ func TestStorageDiskStatistics(t *testing.T) { approxEqual(t, c.GetStorageStatuses()[0].Used, used) require.Equal(t, cfg.Storages[0].Name, c.GetStorageStatuses()[0].StorageName) - require.Equal(t, int64(0), c.GetStorageStatuses()[1].Available) - require.Equal(t, int64(0), c.GetStorageStatuses()[1].Used) - require.Equal(t, cfg.Storages[1].Name, c.GetStorageStatuses()[1].StorageName) + if !testhelper.IsWALEnabled() { + require.Equal(t, int64(0), c.GetStorageStatuses()[1].Available) + require.Equal(t, int64(0), c.GetStorageStatuses()[1].Used) + require.Equal(t, cfg.Storages[1].Name, c.GetStorageStatuses()[1].StorageName) + } - if testhelper.IsPraefectEnabled() { + if testhelper.IsPraefectEnabled() && !testhelper.IsWALEnabled() { // This is incorrect behavior caused by the bug explained above. approxEqual(t, c.GetStorageStatuses()[2].Available, avail) approxEqual(t, c.GetStorageStatuses()[2].Used, used) diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go index b13666b38..6d3179ebc 100644 --- a/internal/gitaly/service/server/info_test.go +++ b/internal/gitaly/service/server/info_test.go @@ -22,10 +22,22 @@ import ( ) func TestGitalyServerInfo(t *testing.T) { - cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken")) + storageOpt := testcfg.WithStorages("default") + if !testhelper.IsWALEnabled() { + // The test is testing a broken storage by deleting the storage after initializing it. + // This causes problems with WAL as the disk state expected to be present by the database + // and the transaction manager suddenly don't exist. Skip the test here with WAL and rely + // on the storage implementation to handle broken storage on initialization. + storageOpt = testcfg.WithStorages("default", "broken") + } + + cfg := testcfg.Build(t, storageOpt) addr := runServer(t, cfg, testserver.WithDisablePraefect()) - require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") + + if !testhelper.IsWALEnabled() { + require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") + } client := newServerClient(t, addr) ctx := testhelper.Context(t) @@ -49,10 +61,12 @@ func TestGitalyServerInfo(t *testing.T) { require.NotEmpty(t, c.GetStorageStatuses()[0].FsType) require.Equal(t, uint32(1), c.GetStorageStatuses()[0].ReplicationFactor) - require.False(t, c.GetStorageStatuses()[1].Readable) - require.False(t, c.GetStorageStatuses()[1].Writeable) - require.Equal(t, metadata.GitalyFilesystemID, c.GetStorageStatuses()[0].FilesystemId) - require.Equal(t, uint32(1), c.GetStorageStatuses()[1].ReplicationFactor) + if !testhelper.IsWALEnabled() { + require.False(t, c.GetStorageStatuses()[1].Readable) + require.False(t, c.GetStorageStatuses()[1].Writeable) + require.Equal(t, metadata.GitalyFilesystemID, c.GetStorageStatuses()[0].FilesystemId) + require.Equal(t, uint32(1), c.GetStorageStatuses()[1].ReplicationFactor) + } } func runServer(t *testing.T, cfg config.Cfg, opts ...testserver.GitalyServerOpt) string { diff --git a/internal/gitaly/storage/storagemgr/partition_assigner.go b/internal/gitaly/storage/storagemgr/partition_assigner.go new file mode 100644 index 000000000..ec8ef9cdd --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition_assigner.go @@ -0,0 +1,199 @@ +package storagemgr + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "strconv" + "sync" + + "github.com/dgraph-io/badger/v4" +) + +// errPartitionAssignmentNotFound is returned when attempting to access a +// partition assignment in the database that doesn't yet exist. +var errPartitionAssignmentNotFound = errors.New("partition assignment not found") + +// partitionID uniquely identifies a partition. +type partitionID uint64 + +func (id partitionID) MarshalBinary() []byte { + marshaled := make([]byte, binary.Size(id)) + binary.BigEndian.PutUint64(marshaled, uint64(id)) + return marshaled +} + +func (id *partitionID) UnmarshalBinary(data []byte) { + *id = partitionID(binary.BigEndian.Uint64(data)) +} + +func (id partitionID) String() string { + return strconv.FormatUint(uint64(id), 10) +} + +// partitionAssignmentTable records which partitions repositories are assigned into. +type partitionAssignmentTable struct{ db *badger.DB } + +func newPartitionAssignmentTable(db *badger.DB) *partitionAssignmentTable { + return &partitionAssignmentTable{db: db} +} + +func (pt *partitionAssignmentTable) key(relativePath string) []byte { + return []byte(fmt.Sprintf("partition_assignment/%s", relativePath)) +} + +func (pt *partitionAssignmentTable) getPartitionID(relativePath string) (partitionID, error) { + var id partitionID + if err := pt.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(pt.key(relativePath)) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return errPartitionAssignmentNotFound + } + + return fmt.Errorf("get: %w", err) + } + + return item.Value(func(value []byte) error { + id.UnmarshalBinary(value) + return nil + }) + }); err != nil { + return 0, fmt.Errorf("view: %w", err) + } + + return id, nil +} + +func (pt *partitionAssignmentTable) setPartitionID(relativePath string, id partitionID) error { + wb := pt.db.NewWriteBatch() + if err := wb.Set(pt.key(relativePath), id.MarshalBinary()); err != nil { + return fmt.Errorf("set: %w", err) + } + + if err := wb.Flush(); err != nil { + return fmt.Errorf("flush: %w", err) + } + + return nil +} + +// partitionAssigner manages assignment of repositories in to partitions. +type partitionAssigner struct { + // mutex synchronizes access to repositoryLocks. + mutex sync.Mutex + // repositoryLocks holds per-repository locks. The key is a relative path and the + // channel closing signals the lock being released. + repositoryLocks map[string]chan struct{} + // idSequence is the sequence used to mint partition IDs. + idSequence *badger.Sequence + // partitionAssignmentTable contains the partition assignment records. + partitionAssignmentTable *partitionAssignmentTable +} + +// newPartitionAssigner returns a new partitionAssigner. Close must be called on the +// returned instance to release acquired resources. +func newPartitionAssigner(db *badger.DB) (*partitionAssigner, error) { + seq, err := db.GetSequence([]byte("partition_id_seq"), 100) + if err != nil { + return nil, fmt.Errorf("get sequence: %w", err) + } + + return &partitionAssigner{ + repositoryLocks: make(map[string]chan struct{}), + idSequence: seq, + partitionAssignmentTable: newPartitionAssignmentTable(db), + }, nil +} + +func (pa *partitionAssigner) Close() error { + return pa.idSequence.Release() +} + +func (pa *partitionAssigner) allocatePartitionID() (partitionID, error) { + // Start partition IDs from 1 so the default value refers to an invalid + // partition. + var id uint64 + for id == 0 { + var err error + id, err = pa.idSequence.Next() + if err != nil { + return 0, fmt.Errorf("next: %w", err) + } + } + + return partitionID(id), nil +} + +// getPartititionID returns the partition ID of the repository. If the repository wasn't yet assigned into +// a partition, it will be assigned into one and the assignment stored. Further accesses return the stored +// partition ID. Each repository goes into its own partition. The method is safe to call concurrently. +func (pa *partitionAssigner) getPartitionID(ctx context.Context, relativePath string) (partitionID, error) { + ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath) + if err != nil { + if !errors.Is(err, errPartitionAssignmentNotFound) { + return 0, fmt.Errorf("get partition: %w", err) + } + + // Repository wasn't yet assigned into a partition. + + pa.mutex.Lock() + // See if some other goroutine already locked the repository. If so, wait for it to complete + // and get the partition ID it set. + if lock, ok := pa.repositoryLocks[relativePath]; ok { + pa.mutex.Unlock() + // Some other goroutine is already assigning a partition for the + // repository. Wait for it to complete and then get the partition. + select { + case <-lock: + ptnID, err := pa.partitionAssignmentTable.getPartitionID(relativePath) + if err != nil { + return 0, fmt.Errorf("get partition ID after waiting: %w", err) + } + return ptnID, nil + case <-ctx.Done(): + return 0, ctx.Err() + } + } + + // No other goroutine had locked the repository yet. Lock the repository so other goroutines + // wait while we assign the repository a partition. + lock := make(chan struct{}) + pa.repositoryLocks[relativePath] = lock + pa.mutex.Unlock() + defer func() { + close(lock) + pa.mutex.Lock() + delete(pa.repositoryLocks, relativePath) + pa.mutex.Unlock() + }() + + // With the repository locked, check first whether someone else assigned it into a partition + // while we weren't holding the lock between the first failed attempt getting the assignment + // and locking the repository. + ptnID, err = pa.partitionAssignmentTable.getPartitionID(relativePath) + if !errors.Is(err, errPartitionAssignmentNotFound) { + if err != nil { + return 0, fmt.Errorf("recheck partition: %w", err) + } + + // Some other goroutine assigned a partition between the failed attempt and locking the + // repository. + return ptnID, nil + } + + // Each repository goes into its own partition. Allocate a new partition ID for this + // repository. + ptnID, err = pa.allocatePartitionID() + if err != nil { + return 0, fmt.Errorf("acquire partition id: %w", err) + } + + if err := pa.partitionAssignmentTable.setPartitionID(relativePath, ptnID); err != nil { + return 0, fmt.Errorf("set partition: %w", err) + } + } + + return ptnID, nil +} diff --git a/internal/gitaly/storage/storagemgr/partition_assigner_test.go b/internal/gitaly/storage/storagemgr/partition_assigner_test.go new file mode 100644 index 000000000..09f975343 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition_assigner_test.go @@ -0,0 +1,119 @@ +package storagemgr + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestPartitionAssigner(t *testing.T) { + db, err := OpenDatabase(testhelper.NewDiscardingLogger(t), t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, db) + + pa, err := newPartitionAssigner(db) + require.NoError(t, err) + defer testhelper.MustClose(t, pa) + + ctx := testhelper.Context(t) + + relativePath1 := "relative-path-1" + // The relative path should get assigned into partition. + ptnID1, err := pa.getPartitionID(ctx, relativePath1) + require.NoError(t, err) + require.EqualValues(t, 1, ptnID1, "first allocated partition id should be 1") + + // The second repository should land into its own partition. + ptnID2, err := pa.getPartitionID(ctx, "relative-path-2") + require.NoError(t, err) + require.EqualValues(t, 2, ptnID2) + + // Retrieving the first repository's partition should return the partition that + // was assigned earlier. + ptnID1, err = pa.getPartitionID(ctx, relativePath1) + require.NoError(t, err) + require.EqualValues(t, 1, ptnID1) +} + +func TestPartitionAssigner_close(t *testing.T) { + dbDir := t.TempDir() + + db, err := OpenDatabase(testhelper.NewDiscardingLogger(t), dbDir) + require.NoError(t, err) + + pa, err := newPartitionAssigner(db) + require.NoError(t, err) + testhelper.MustClose(t, pa) + testhelper.MustClose(t, db) + + db, err = OpenDatabase(testhelper.NewDiscardingLogger(t), dbDir) + require.NoError(t, err) + defer testhelper.MustClose(t, db) + + pa, err = newPartitionAssigner(db) + require.NoError(t, err) + defer testhelper.MustClose(t, pa) + + // A block of ID is loaded into memory when the partitionAssigner is initialized. + // Closing the partitionAssigner is expected to return the unused IDs in the block + // back to the database. + ptnID, err := pa.getPartitionID(testhelper.Context(t), "relative-path") + require.NoError(t, err) + require.EqualValues(t, 1, ptnID) +} + +func TestPartitionAssigner_concurrentAccess(t *testing.T) { + db, err := OpenDatabase(testhelper.NewDiscardingLogger(t), t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, db) + + pa, err := newPartitionAssigner(db) + require.NoError(t, err) + defer testhelper.MustClose(t, pa) + + // Access 10 repositories concurrently. + repositoryCount := 10 + // Access each repository from 10 goroutines concurrently. + goroutineCount := 10 + + collectedIDs := make([][]partitionID, repositoryCount) + ctx := testhelper.Context(t) + wg := sync.WaitGroup{} + start := make(chan struct{}) + for i := 0; i < repositoryCount; i++ { + i := i + collectedIDs[i] = make([]partitionID, goroutineCount) + relativePath := fmt.Sprintf("relative-path-%d", i) + for j := 0; j < goroutineCount; j++ { + j := j + wg.Add(1) + go func() { + defer wg.Done() + <-start + ptnID, err := pa.getPartitionID(ctx, relativePath) + assert.NoError(t, err) + collectedIDs[i][j] = ptnID + }() + } + } + + close(start) + wg.Wait() + + var partitionIDs []partitionID + for _, ids := range collectedIDs { + partitionIDs = append(partitionIDs, ids[0]) + for i := range ids { + // We expect all goroutines accessing a given repository to get the + // same partition ID for it. + require.Equal(t, ids[0], ids[i], ids) + } + } + + // We expect to have 10 unique partition IDs as there are 10 repositories being accessed. + require.ElementsMatch(t, []partitionID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, partitionIDs) +} diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index e39ba96b3..b44a72621 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -9,7 +9,6 @@ import ( "io/fs" "os" "path/filepath" - "strings" "sync" "github.com/dgraph-io/badger/v4" @@ -55,8 +54,10 @@ type storageManager struct { closed bool // db is the handle to the key-value store used for storing the storage's database state. database *badger.DB + // partitionAssigner manages partition assignments of repositories. + partitionAssigner *partitionAssigner // partitions contains all the active partitions. Each repository can have up to one partition. - partitions map[string]*partition + partitions map[partitionID]*partition // activePartitions keeps track of active partitions. activePartitions sync.WaitGroup } @@ -75,6 +76,10 @@ func (sm *storageManager) close() { // Wait for all partitions to finish. sm.activePartitions.Wait() + if err := sm.partitionAssigner.Close(); err != nil { + sm.logger.WithError(err).Error("failed closing partition assigner") + } + if err := sm.database.Close(); err != nil { sm.logger.WithError(err).Error("failed closing storage's database") } @@ -200,13 +205,19 @@ func NewPartitionManager( return nil, fmt.Errorf("create storage's database directory: %w", err) } + pa, err := newPartitionAssigner(db) + if err != nil { + return nil, fmt.Errorf("new partition assigner: %w", err) + } + storages[storage.Name] = &storageManager{ - logger: storageLogger, - path: storage.Path, - repoFactory: repoFactory, - stagingDirectory: stagingDir, - database: db, - partitions: map[string]*partition{}, + logger: storageLogger, + path: storage.Path, + repoFactory: repoFactory, + stagingDirectory: stagingDir, + database: db, + partitionAssigner: pa, + partitions: map[partitionID]*partition{}, } } @@ -231,7 +242,18 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository, return nil, structerr.NewInvalidArgument("validate relative path: %w", err) } - relativeStateDir := deriveStateDirectory(relativePath) + partitionID, err := storageMgr.partitionAssigner.getPartitionID(ctx, relativePath) + if err != nil { + if errors.Is(err, badger.ErrDBClosed) { + // The database is closed when PartitionManager is closing. Return a more + // descriptive error of what happened. + return nil, ErrPartitionManagerClosed + } + + return nil, fmt.Errorf("get partition: %w", err) + } + + relativeStateDir := deriveStateDirectory(partitionID) absoluteStateDir := filepath.Join(storageMgr.path, relativeStateDir) if err := os.MkdirAll(filepath.Dir(absoluteStateDir), perm.PrivateDir); err != nil { return nil, fmt.Errorf("create state directory hierarchy: %w", err) @@ -248,7 +270,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository, return nil, ErrPartitionManagerClosed } - ptn, ok := storageMgr.partitions[relativePath] + ptn, ok := storageMgr.partitions[partitionID] if !ok { ptn = &partition{ closing: make(chan struct{}), @@ -265,7 +287,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository, ptn.transactionManager = mgr - storageMgr.partitions[relativePath] = ptn + storageMgr.partitions[partitionID] = ptn storageMgr.activePartitions.Add(1) go func() { @@ -280,7 +302,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository, // deleted allowing the next transaction for the repository to create a new partition // and TransactionManager. storageMgr.mu.Lock() - delete(storageMgr.partitions, relativePath) + delete(storageMgr.partitions, partitionID) storageMgr.mu.Unlock() close(ptn.closed) @@ -334,11 +356,11 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo storage.Repository, } } -// deriveStateDirectory hashes the relative path and returns the state directory where a state -// related to a given partition should be stored. -func deriveStateDirectory(relativePath string) string { +// deriveStateDirectory hashes the partition ID and returns the state +// directory where state related to the partition should be stored. +func deriveStateDirectory(id partitionID) string { hasher := sha256.New() - hasher.Write([]byte(relativePath)) + hasher.Write([]byte(id.String())) hash := hex.EncodeToString(hasher.Sum(nil)) return filepath.Join( @@ -347,9 +369,7 @@ func deriveStateDirectory(relativePath string) string { // subdirectories to keep the directory sizes reasonable. hash[0:2], hash[2:4], - // Flatten the relative path by removing the path separators so the - // repository is stored on this level in the directory hierarchy. - strings.ReplaceAll(relativePath, string(os.PathSeparator), ""), + id.String(), ) } diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index ddb960fa5..eac914eee 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -44,7 +44,7 @@ func TestPartitionManager(t *testing.T) { repo storage.Repository // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]map[string]uint + expectedState map[string]map[partitionID]uint // expectedError is the error expected to be returned when beginning the transaction. expectedError error } @@ -57,7 +57,7 @@ func TestPartitionManager(t *testing.T) { ctx context.Context // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]map[string]uint + expectedState map[string]map[partitionID]uint // expectedError is the error that is expected to be returned when committing the transaction. expectedError error } @@ -68,7 +68,7 @@ func TestPartitionManager(t *testing.T) { transactionID int // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]map[string]uint + expectedState map[string]map[partitionID]uint // expectedError is the error that is expected to be returned when rolling back the transaction. expectedError error } @@ -117,22 +117,22 @@ func TestPartitionManager(t *testing.T) { // checkExpectedState validates that the partition manager contains the correct partitions and // associated transaction count at the point of execution. - checkExpectedState := func(t *testing.T, cfg config.Cfg, partitionManager *PartitionManager, expectedState map[string]map[string]uint) { + checkExpectedState := func(t *testing.T, cfg config.Cfg, partitionManager *PartitionManager, expectedState map[string]map[partitionID]uint) { t.Helper() - actualState := map[string]map[string]uint{} + actualState := map[string]map[partitionID]uint{} for storageName, storageMgr := range partitionManager.storages { - for partitionKey, partition := range storageMgr.partitions { + for ptnID, partition := range storageMgr.partitions { if actualState[storageName] == nil { - actualState[storageName] = map[string]uint{} + actualState[storageName] = map[partitionID]uint{} } - actualState[storageName][partitionKey] = partition.pendingTransactionCount + actualState[storageName][ptnID] = partition.pendingTransactionCount } } if expectedState == nil { - expectedState = map[string]map[string]uint{} + expectedState = map[string]map[partitionID]uint{} } require.Equal(t, expectedState, actualState) @@ -173,9 +173,9 @@ func TestPartitionManager(t *testing.T) { steps: steps{ begin{ repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -194,9 +194,9 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 1, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -206,9 +206,9 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 2, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -229,26 +229,26 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 1, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, begin{ transactionID: 2, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 2, + 1: 2, }, }, }, commit{ transactionID: 1, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -271,51 +271,51 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 1, repo: repoA, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repoA.GetRelativePath(): 1, + 1: 1, }, }, }, begin{ transactionID: 2, repo: repoB, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repoA.GetRelativePath(): 1, - repoB.GetRelativePath(): 1, + 1: 1, + 2: 1, }, }, }, begin{ transactionID: 3, repo: repoC, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repoA.GetRelativePath(): 1, - repoB.GetRelativePath(): 1, + 1: 1, + 2: 1, }, "other-storage": { - repoC.GetRelativePath(): 1, + 1: 1, }, }, }, commit{ transactionID: 1, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repoB.GetRelativePath(): 1, + 2: 1, }, "other-storage": { - repoC.GetRelativePath(): 1, + 1: 1, }, }, }, commit{ transactionID: 2, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "other-storage": { - repoC.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -335,9 +335,9 @@ func TestPartitionManager(t *testing.T) { steps: steps{ begin{ repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -377,9 +377,9 @@ func TestPartitionManager(t *testing.T) { steps: steps{ begin{ repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -400,9 +400,9 @@ func TestPartitionManager(t *testing.T) { steps: steps{ begin{ repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -424,9 +424,9 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 1, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -436,9 +436,9 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 2, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -516,9 +516,9 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 1, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -528,9 +528,9 @@ func TestPartitionManager(t *testing.T) { StorageName: repo.GetStorageName(), RelativePath: filepath.Join(repo.GetRelativePath(), "child-dir", ".."), }, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 2, + 1: 2, }, }, }, @@ -548,9 +548,9 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 1, repo: repo, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, @@ -560,25 +560,25 @@ func TestPartitionManager(t *testing.T) { StorageName: repo.GetStorageName(), RelativePath: repo.GetRelativePath(), }, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 2, + 1: 2, }, }, }, rollback{ transactionID: 2, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, }, rollback{ transactionID: 2, - expectedState: map[string]map[string]uint{ + expectedState: map[string]map[partitionID]uint{ "default": { - repo.GetRelativePath(): 1, + 1: 1, }, }, expectedError: ErrTransactionAlreadyRollbacked, @@ -663,7 +663,11 @@ func TestPartitionManager(t *testing.T) { storageMgr := partitionManager.storages[step.repo.GetStorageName()] storageMgr.mu.Lock() - ptn := storageMgr.partitions[step.repo.GetRelativePath()] + + ptnID, err := storageMgr.partitionAssigner.getPartitionID(ctx, step.repo.GetRelativePath()) + require.NoError(t, err) + + ptn := storageMgr.partitions[ptnID] storageMgr.mu.Unlock() openTransactionData[step.transactionID] = &transactionData{ diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go index cfbd7beb5..5f635de0b 100644 --- a/internal/praefect/remove_all_test.go +++ b/internal/praefect/remove_all_test.go @@ -24,6 +24,13 @@ import ( ) func TestRemoveAllHandler(t *testing.T) { + testhelper.SkipWithWAL(t, ` +RemoveAll is removing the entire content of the storage. This would also remove the database's and +the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and +the database and only then perform the removal. + +Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) + t.Parallel() ctx := testhelper.Context(t) |