Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2023-10-23 22:32:45 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2023-10-31 15:20:51 +0300
commit760533680a13adb6ba7ca5d1d406689513813c56 (patch)
tree20859072e556662e1d1a1dab2ca8aa9066d27b0e
parent372615000b33841360d85651722d2069128d7290 (diff)
Garbage collect value log entries
BadgerDB stores the keys in the LSM tree and the in a log. The LSM tree is compacted in the background automatically. The client is responsible for triggering the value log's garbage collection to prune out unneeded values. As we'll soon wire transactions to Gitaly, we should start garbage collecting the value log as well. This commit makes PartitionManager run a garbage collection goroutine for each storage's database. The goroutine is started when the database is opened and stopped when the PartitionManager is closed. Garbage collection is ran for each database once per minute. The garbage collection works by rewriting the value log while omitting the pruned values. While the garbage collection runs once a minute, it's configured to only rewrite the log file if it can free more than 50% of the space. This aims to strike a balance between the load from rewriting the log files and reclaiming space. Each RunValueLogGC run garbage collects only a single file. If a file was garbage collected, we run the function again immediately to check for further files that may need garbage collection The garbage collection interval and the discard ratio may need to be changed in the future. We'll make them configurable in later changes.
-rw-r--r--internal/gitaly/storage/storagemgr/database.go1
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager.go70
-rw-r--r--internal/gitaly/storage/storagemgr/partition_manager_test.go111
-rw-r--r--internal/testhelper/testserver/gitaly.go2
4 files changed, 178 insertions, 6 deletions
diff --git a/internal/gitaly/storage/storagemgr/database.go b/internal/gitaly/storage/storagemgr/database.go
index 14d98ff3a..6c012b3b2 100644
--- a/internal/gitaly/storage/storagemgr/database.go
+++ b/internal/gitaly/storage/storagemgr/database.go
@@ -78,6 +78,7 @@ type Database interface {
View(func(DatabaseTransaction) error) error
Update(func(DatabaseTransaction) error) error
GetSequence([]byte, uint64) (Sequence, error)
+ RunValueLogGC(float64) error
Close() error
}
diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go
index d133a773b..27f74c04e 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager.go
@@ -17,6 +17,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/safe"
@@ -86,6 +87,8 @@ type storageManager struct {
// closed tracks whether the storageManager has been closed. If it is closed,
// no new transactions are allowed to begin.
closed bool
+ // stopGC stops the garbage collection and waits for it to return.
+ stopGC func()
// db is the handle to the key-value store used for storing the storage's database state.
database Database
// partitionAssigner manages partition assignments of repositories.
@@ -114,6 +117,9 @@ func (sm *storageManager) close() {
sm.logger.WithError(err).Error("failed closing partition assigner")
}
+ // Wait until the database's garbage collection goroutine has returned.
+ sm.stopGC()
+
if err := sm.database.Close(); err != nil {
sm.logger.WithError(err).Error("failed closing storage's database")
}
@@ -214,6 +220,7 @@ func NewPartitionManager(
localRepoFactory localrepo.Factory,
logger log.Logger,
dbOpener DatabaseOpener,
+ gcTickerFactory helper.TickerFactory,
) (*PartitionManager, error) {
storages := make(map[string]*storageManager, len(configuredStorages))
for _, storage := range configuredStorages {
@@ -253,11 +260,66 @@ func NewPartitionManager(
return nil, fmt.Errorf("new partition assigner: %w", err)
}
+ gcCtx, stopGC := context.WithCancel(context.Background())
+ gcStopped := make(chan struct{})
+ go func() {
+ defer func() {
+ storageLogger.Info("value log garbage collection goroutine stopped")
+ close(gcStopped)
+ }()
+
+ // Configure the garbage collection discard ratio at 0.5. This means the value log is garbage
+ // collected if we can reclaim more than half of the space.
+ const gcDiscardRatio = 0.5
+
+ ticker := gcTickerFactory.NewTicker()
+ for {
+ storageLogger.Info("value log garbage collection started")
+
+ for {
+ if err := db.RunValueLogGC(gcDiscardRatio); err != nil {
+ if errors.Is(err, badger.ErrNoRewrite) {
+ // No log files were rewritten. This means there was nothing
+ // to garbage collect.
+ break
+ }
+
+ storageLogger.WithError(err).Error("value log garbage collection failed")
+ break
+ }
+
+ // Log files were garbage collected. Check immediately if there are more
+ // files that need garbage collection.
+ storageLogger.Info("value log file garbage collected")
+
+ if gcCtx.Err() != nil {
+ // As we'd keep going until no log files were rewritten, break the loop
+ // if GC has run.
+ break
+ }
+ }
+
+ storageLogger.Info("value log garbage collection finished")
+
+ ticker.Reset()
+ select {
+ case <-ticker.C():
+ case <-gcCtx.Done():
+ ticker.Stop()
+ return
+ }
+ }
+ }()
+
storages[storage.Name] = &storageManager{
- logger: storageLogger,
- path: storage.Path,
- repoFactory: repoFactory,
- stagingDirectory: stagingDir,
+ logger: storageLogger,
+ path: storage.Path,
+ repoFactory: repoFactory,
+ stagingDirectory: stagingDir,
+ stopGC: func() {
+ stopGC()
+ <-gcStopped
+ },
database: db,
partitionAssigner: pa,
partitions: map[partitionID]*partition{},
diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go
index 48d5b5aa1..8056650f5 100644
--- a/internal/gitaly/storage/storagemgr/partition_manager_test.go
+++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go
@@ -2,12 +2,16 @@ package storagemgr
import (
"context"
+ "errors"
"io/fs"
"os"
"path/filepath"
+ "strings"
"sync"
"testing"
+ "github.com/dgraph-io/badger/v4"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v16/internal/git"
@@ -19,7 +23,9 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
@@ -678,7 +684,7 @@ func TestPartitionManager(t *testing.T) {
txManager := transaction.NewManager(cfg, logger, backchannel.NewRegistry())
housekeepingManager := housekeeping.NewManager(cfg.Prometheus, logger, txManager)
- partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase))
+ partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase), helper.NewNullTickerFactory())
require.NoError(t, err)
if setup.transactionManagerFactory != nil {
@@ -790,6 +796,107 @@ func TestPartitionManager(t *testing.T) {
}
}
+type dbWrapper struct {
+ Database
+ runValueLogGC func(float64) error
+}
+
+func (db dbWrapper) RunValueLogGC(discardRatio float64) error {
+ return db.runValueLogGC(discardRatio)
+}
+
+func TestPartitionManager_garbageCollection(t *testing.T) {
+ t.Parallel()
+
+ cfg := testcfg.Build(t)
+
+ logger := testhelper.NewLogger(t)
+ loggerHook := testhelper.AddLoggerHook(logger)
+
+ cmdFactory := gittest.NewCommandFactory(t, cfg)
+ catfileCache := catfile.NewCache(cfg)
+ defer catfileCache.Stop()
+
+ localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache)
+
+ txManager := transaction.NewManager(cfg, logger, backchannel.NewRegistry())
+ housekeepingManager := housekeeping.NewManager(cfg.Prometheus, logger, txManager)
+
+ gcRunCount := 0
+ gcCompleted := make(chan struct{})
+ errExpected := errors.New("some gc failure")
+
+ partitionManager, err := NewPartitionManager(
+ cfg.Storages,
+ cmdFactory,
+ housekeepingManager,
+ localRepoFactory,
+ logger,
+ DatabaseOpenerFunc(func(logger log.Logger, path string) (Database, error) {
+ db, err := OpenDatabase(logger, path)
+ return dbWrapper{
+ Database: db,
+ runValueLogGC: func(discardRatio float64) error {
+ gcRunCount++
+ if gcRunCount < 3 {
+ return nil
+ }
+
+ if gcRunCount == 3 {
+ return badger.ErrNoRewrite
+ }
+
+ return errExpected
+ },
+ }, err
+ }),
+ helper.TickerFactoryFunc(func() helper.Ticker {
+ return helper.NewCountTicker(1, func() {
+ close(gcCompleted)
+ })
+ }),
+ )
+ require.NoError(t, err)
+ defer partitionManager.Close()
+
+ // The ticker has exhausted and we've performed the two GC runs we wanted to test.
+ <-gcCompleted
+
+ // Close the manager to ensure the GC goroutine also stops.
+ partitionManager.Close()
+
+ var gcLogs []*logrus.Entry
+ for _, entry := range loggerHook.AllEntries() {
+ if !strings.HasPrefix(entry.Message, "value log") {
+ continue
+ }
+
+ gcLogs = append(gcLogs, entry)
+ }
+
+ // We're testing the garbage collection goroutine through multiple loops.
+ //
+ // The first runs immediately on startup before the ticker even ticks. The
+ // First RunValueLogGC pretends to have performed a GC, so another GC is
+ // immediately attempted. The second round returns badger.ErrNoRewrite, so
+ // the GC loop stops and waits for another tick
+ require.Equal(t, "value log garbage collection started", gcLogs[0].Message)
+ require.Equal(t, "value log file garbage collected", gcLogs[1].Message)
+ require.Equal(t, "value log file garbage collected", gcLogs[2].Message)
+ require.Equal(t, "value log garbage collection finished", gcLogs[3].Message)
+
+ // The second tick results in a garbage collection run that pretend to have
+ // failed with errExpected.
+ require.Equal(t, "value log garbage collection started", gcLogs[4].Message)
+ require.Equal(t, "value log garbage collection failed", gcLogs[5].Message)
+ require.Equal(t, errExpected, gcLogs[5].Data[logrus.ErrorKey])
+ require.Equal(t, "value log garbage collection finished", gcLogs[6].Message)
+
+ // After the second round, the PartititionManager is closed and we assert that the
+ // garbage collection goroutine has also stopped.
+ require.Equal(t, "value log garbage collection goroutine stopped", gcLogs[7].Message)
+}
+
func TestPartitionManager_concurrentClose(t *testing.T) {
t.Parallel()
@@ -807,7 +914,7 @@ func TestPartitionManager_concurrentClose(t *testing.T) {
txManager := transaction.NewManager(cfg, logger, backchannel.NewRegistry())
housekeepingManager := housekeeping.NewManager(cfg.Prometheus, logger, txManager)
- partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase))
+ partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase), helper.NewNullTickerFactory())
require.NoError(t, err)
defer partitionManager.Close()
diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go
index 342551331..9af5d742b 100644
--- a/internal/testhelper/testserver/gitaly.go
+++ b/internal/testhelper/testserver/gitaly.go
@@ -31,6 +31,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm"
"gitlab.com/gitlab-org/gitaly/v16/internal/limiter"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
@@ -377,6 +378,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *
localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache),
gsd.logger,
storagemgr.DatabaseOpenerFunc(storagemgr.OpenDatabase),
+ helper.NewNullTickerFactory(),
)
require.NoError(tb, err)
tb.Cleanup(partitionManager.Close)