diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2023-10-23 22:32:45 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2023-10-31 15:20:51 +0300 |
commit | 760533680a13adb6ba7ca5d1d406689513813c56 (patch) | |
tree | 20859072e556662e1d1a1dab2ca8aa9066d27b0e | |
parent | 372615000b33841360d85651722d2069128d7290 (diff) |
Garbage collect value log entries
BadgerDB stores the keys in the LSM tree and the in a log. The LSM
tree is compacted in the background automatically. The client is
responsible for triggering the value log's garbage collection to prune
out unneeded values. As we'll soon wire transactions to Gitaly, we
should start garbage collecting the value log as well.
This commit makes PartitionManager run a garbage collection goroutine
for each storage's database. The goroutine is started when the database
is opened and stopped when the PartitionManager is closed. Garbage
collection is ran for each database once per minute. The garbage collection
works by rewriting the value log while omitting the pruned values. While
the garbage collection runs once a minute, it's configured to only rewrite
the log file if it can free more than 50% of the space. This aims to strike
a balance between the load from rewriting the log files and reclaiming space.
Each RunValueLogGC run garbage collects only a single file. If a file was
garbage collected, we run the function again immediately to check for further
files that may need garbage collection
The garbage collection interval and the discard ratio may need to be changed
in the future. We'll make them configurable in later changes.
4 files changed, 178 insertions, 6 deletions
diff --git a/internal/gitaly/storage/storagemgr/database.go b/internal/gitaly/storage/storagemgr/database.go index 14d98ff3a..6c012b3b2 100644 --- a/internal/gitaly/storage/storagemgr/database.go +++ b/internal/gitaly/storage/storagemgr/database.go @@ -78,6 +78,7 @@ type Database interface { View(func(DatabaseTransaction) error) error Update(func(DatabaseTransaction) error) error GetSequence([]byte, uint64) (Sequence, error) + RunValueLogGC(float64) error Close() error } diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index d133a773b..27f74c04e 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -17,6 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" @@ -86,6 +87,8 @@ type storageManager struct { // closed tracks whether the storageManager has been closed. If it is closed, // no new transactions are allowed to begin. closed bool + // stopGC stops the garbage collection and waits for it to return. + stopGC func() // db is the handle to the key-value store used for storing the storage's database state. database Database // partitionAssigner manages partition assignments of repositories. @@ -114,6 +117,9 @@ func (sm *storageManager) close() { sm.logger.WithError(err).Error("failed closing partition assigner") } + // Wait until the database's garbage collection goroutine has returned. + sm.stopGC() + if err := sm.database.Close(); err != nil { sm.logger.WithError(err).Error("failed closing storage's database") } @@ -214,6 +220,7 @@ func NewPartitionManager( localRepoFactory localrepo.Factory, logger log.Logger, dbOpener DatabaseOpener, + gcTickerFactory helper.TickerFactory, ) (*PartitionManager, error) { storages := make(map[string]*storageManager, len(configuredStorages)) for _, storage := range configuredStorages { @@ -253,11 +260,66 @@ func NewPartitionManager( return nil, fmt.Errorf("new partition assigner: %w", err) } + gcCtx, stopGC := context.WithCancel(context.Background()) + gcStopped := make(chan struct{}) + go func() { + defer func() { + storageLogger.Info("value log garbage collection goroutine stopped") + close(gcStopped) + }() + + // Configure the garbage collection discard ratio at 0.5. This means the value log is garbage + // collected if we can reclaim more than half of the space. + const gcDiscardRatio = 0.5 + + ticker := gcTickerFactory.NewTicker() + for { + storageLogger.Info("value log garbage collection started") + + for { + if err := db.RunValueLogGC(gcDiscardRatio); err != nil { + if errors.Is(err, badger.ErrNoRewrite) { + // No log files were rewritten. This means there was nothing + // to garbage collect. + break + } + + storageLogger.WithError(err).Error("value log garbage collection failed") + break + } + + // Log files were garbage collected. Check immediately if there are more + // files that need garbage collection. + storageLogger.Info("value log file garbage collected") + + if gcCtx.Err() != nil { + // As we'd keep going until no log files were rewritten, break the loop + // if GC has run. + break + } + } + + storageLogger.Info("value log garbage collection finished") + + ticker.Reset() + select { + case <-ticker.C(): + case <-gcCtx.Done(): + ticker.Stop() + return + } + } + }() + storages[storage.Name] = &storageManager{ - logger: storageLogger, - path: storage.Path, - repoFactory: repoFactory, - stagingDirectory: stagingDir, + logger: storageLogger, + path: storage.Path, + repoFactory: repoFactory, + stagingDirectory: stagingDir, + stopGC: func() { + stopGC() + <-gcStopped + }, database: db, partitionAssigner: pa, partitions: map[partitionID]*partition{}, diff --git a/internal/gitaly/storage/storagemgr/partition_manager_test.go b/internal/gitaly/storage/storagemgr/partition_manager_test.go index 48d5b5aa1..8056650f5 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition_manager_test.go @@ -2,12 +2,16 @@ package storagemgr import ( "context" + "errors" "io/fs" "os" "path/filepath" + "strings" "sync" "testing" + "github.com/dgraph-io/badger/v4" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -19,7 +23,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -678,7 +684,7 @@ func TestPartitionManager(t *testing.T) { txManager := transaction.NewManager(cfg, logger, backchannel.NewRegistry()) housekeepingManager := housekeeping.NewManager(cfg.Prometheus, logger, txManager) - partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase)) + partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase), helper.NewNullTickerFactory()) require.NoError(t, err) if setup.transactionManagerFactory != nil { @@ -790,6 +796,107 @@ func TestPartitionManager(t *testing.T) { } } +type dbWrapper struct { + Database + runValueLogGC func(float64) error +} + +func (db dbWrapper) RunValueLogGC(discardRatio float64) error { + return db.runValueLogGC(discardRatio) +} + +func TestPartitionManager_garbageCollection(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + + logger := testhelper.NewLogger(t) + loggerHook := testhelper.AddLoggerHook(logger) + + cmdFactory := gittest.NewCommandFactory(t, cfg) + catfileCache := catfile.NewCache(cfg) + defer catfileCache.Stop() + + localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) + + txManager := transaction.NewManager(cfg, logger, backchannel.NewRegistry()) + housekeepingManager := housekeeping.NewManager(cfg.Prometheus, logger, txManager) + + gcRunCount := 0 + gcCompleted := make(chan struct{}) + errExpected := errors.New("some gc failure") + + partitionManager, err := NewPartitionManager( + cfg.Storages, + cmdFactory, + housekeepingManager, + localRepoFactory, + logger, + DatabaseOpenerFunc(func(logger log.Logger, path string) (Database, error) { + db, err := OpenDatabase(logger, path) + return dbWrapper{ + Database: db, + runValueLogGC: func(discardRatio float64) error { + gcRunCount++ + if gcRunCount < 3 { + return nil + } + + if gcRunCount == 3 { + return badger.ErrNoRewrite + } + + return errExpected + }, + }, err + }), + helper.TickerFactoryFunc(func() helper.Ticker { + return helper.NewCountTicker(1, func() { + close(gcCompleted) + }) + }), + ) + require.NoError(t, err) + defer partitionManager.Close() + + // The ticker has exhausted and we've performed the two GC runs we wanted to test. + <-gcCompleted + + // Close the manager to ensure the GC goroutine also stops. + partitionManager.Close() + + var gcLogs []*logrus.Entry + for _, entry := range loggerHook.AllEntries() { + if !strings.HasPrefix(entry.Message, "value log") { + continue + } + + gcLogs = append(gcLogs, entry) + } + + // We're testing the garbage collection goroutine through multiple loops. + // + // The first runs immediately on startup before the ticker even ticks. The + // First RunValueLogGC pretends to have performed a GC, so another GC is + // immediately attempted. The second round returns badger.ErrNoRewrite, so + // the GC loop stops and waits for another tick + require.Equal(t, "value log garbage collection started", gcLogs[0].Message) + require.Equal(t, "value log file garbage collected", gcLogs[1].Message) + require.Equal(t, "value log file garbage collected", gcLogs[2].Message) + require.Equal(t, "value log garbage collection finished", gcLogs[3].Message) + + // The second tick results in a garbage collection run that pretend to have + // failed with errExpected. + require.Equal(t, "value log garbage collection started", gcLogs[4].Message) + require.Equal(t, "value log garbage collection failed", gcLogs[5].Message) + require.Equal(t, errExpected, gcLogs[5].Data[logrus.ErrorKey]) + require.Equal(t, "value log garbage collection finished", gcLogs[6].Message) + + // After the second round, the PartititionManager is closed and we assert that the + // garbage collection goroutine has also stopped. + require.Equal(t, "value log garbage collection goroutine stopped", gcLogs[7].Message) +} + func TestPartitionManager_concurrentClose(t *testing.T) { t.Parallel() @@ -807,7 +914,7 @@ func TestPartitionManager_concurrentClose(t *testing.T) { txManager := transaction.NewManager(cfg, logger, backchannel.NewRegistry()) housekeepingManager := housekeeping.NewManager(cfg.Prometheus, logger, txManager) - partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase)) + partitionManager, err := NewPartitionManager(cfg.Storages, cmdFactory, housekeepingManager, localRepoFactory, logger, DatabaseOpenerFunc(OpenDatabase), helper.NewNullTickerFactory()) require.NoError(t, err) defer partitionManager.Close() diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 342551331..9af5d742b 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -31,6 +31,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -377,6 +378,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), gsd.logger, storagemgr.DatabaseOpenerFunc(storagemgr.OpenDatabase), + helper.NewNullTickerFactory(), ) require.NoError(tb, err) tb.Cleanup(partitionManager.Close) |