Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2021-03-26 02:04:12 +0300
committerJames Fargher <proglottis@gmail.com>2021-03-26 02:04:12 +0300
commiteec4ea9e3def205ee9906840d45b7c9b764d653a (patch)
treeeb1bd351e487d4508b8f36ff1fc218a2b1d6f0b7
parentd23cc3920afde63ee62cfcb74b20102bd24e6037 (diff)
Do not return error from streamcache.New
The way we wire up gRPC services, it is awkward if a gRPC service server constructor can return an error. This commit refactors streamcache.New and streamcache.newFilestore to never return an error. The one thing that could return an error now happens lazily. Because there is now more synchronization in filestore, this change includes a refactor where we change to a single mutex for the whole struct.
-rw-r--r--internal/gitaly/config/config.go52
-rw-r--r--internal/gitaly/config/config_test.go71
-rw-r--r--internal/gitaly/service/hook/pack_objects_test.go16
-rw-r--r--internal/gitaly/service/hook/server.go52
-rw-r--r--internal/gitaly/service/hook/server_test.go53
-rw-r--r--internal/streamcache/cache.go29
-rw-r--r--internal/streamcache/cache_test.go101
-rw-r--r--internal/streamcache/filestore.go70
-rw-r--r--internal/streamcache/filestore_test.go12
-rw-r--r--internal/tempdir/tempdir.go9
10 files changed, 352 insertions, 113 deletions
diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go
index 5d60b62e9..9446b0c81 100644
--- a/internal/gitaly/config/config.go
+++ b/internal/gitaly/config/config.go
@@ -25,6 +25,14 @@ import (
"golang.org/x/sys/unix"
)
+const (
+ // GitalyDataPrefix is the top-level directory we use to store system
+ // (non-user) data. We need to be careful that this path does not clash
+ // with any directory name that could be provided by a user. The '+'
+ // character is not allowed in GitLab namespaces or repositories.
+ GitalyDataPrefix = "+gitaly"
+)
+
var (
// Config stores the global configuration
// Deprecated: please do not use global variable and pass preconfigured Cfg as a parameter
@@ -68,6 +76,7 @@ type Cfg struct {
InternalSocketDir string `toml:"internal_socket_dir"`
DailyMaintenance DailyJob `toml:"daily_maintenance"`
Cgroups cgroups.Config `toml:"cgroups"`
+ PackObjectsCache PackObjectsCache `toml:"pack_objects_cache"`
}
// TLS configuration
@@ -140,6 +149,13 @@ type Concurrency struct {
MaxPerRepo int `toml:"max_per_repo"`
}
+// PackObjectsCache contains settings for the pack-objects cache.
+type PackObjectsCache struct {
+ Enabled bool `toml:"enabled"` // Default: false
+ Dir string `toml:"dir"` // Default: <FIRST STORAGE PATH>/+gitaly/PackObjectsCache
+ MaxAge Duration `toml:"max_age"` // Default: 5m
+}
+
// Load initializes the Config variable from file and the environment.
// Environment variables take precedence over the file.
func Load(file io.Reader) (Cfg, error) {
@@ -187,6 +203,7 @@ func (cfg *Cfg) Validate() error {
cfg.validateHooks,
cfg.validateMaintenance,
cfg.validateCgroups,
+ cfg.configurePackObjectsCache,
} {
if err := run(); err != nil {
return err
@@ -575,3 +592,38 @@ func (cfg *Cfg) validateCgroups() error {
return nil
}
+
+var (
+ errPackObjectsCacheNegativeMaxAge = errors.New("pack_objects_cache.max_age cannot be negative")
+ errPackObjectsCacheNoStorages = errors.New("pack_objects_cache: cannot pick default cache directory: no storages")
+ errPackObjectsCacheRelativePath = errors.New("pack_objects_cache: storage directory must be absolute path")
+)
+
+func (cfg *Cfg) configurePackObjectsCache() error {
+ poc := &cfg.PackObjectsCache
+ if !poc.Enabled {
+ return nil
+ }
+
+ if poc.MaxAge < 0 {
+ return errPackObjectsCacheNegativeMaxAge
+ }
+
+ if poc.MaxAge == 0 {
+ poc.MaxAge = Duration(5 * time.Minute)
+ }
+
+ if poc.Dir == "" {
+ if len(cfg.Storages) == 0 {
+ return errPackObjectsCacheNoStorages
+ }
+
+ poc.Dir = filepath.Join(cfg.Storages[0].Path, GitalyDataPrefix, "PackObjectsCache")
+ }
+
+ if !filepath.IsAbs(poc.Dir) {
+ return errPackObjectsCacheRelativePath
+ }
+
+ return nil
+}
diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go
index 7c6e5205c..82a089bee 100644
--- a/internal/gitaly/config/config_test.go
+++ b/internal/gitaly/config/config_test.go
@@ -1052,3 +1052,74 @@ func TestValidateCgroups(t *testing.T) {
})
}
}
+
+func TestConfigurePackObjectsCache(t *testing.T) {
+ storageConfig := `[[storage]]
+name="default"
+path="/foobar"
+`
+
+ testCases := []struct {
+ desc string
+ in string
+ out PackObjectsCache
+ err error
+ }{
+ {desc: "empty"},
+ {
+ desc: "enabled",
+ in: storageConfig + `[pack_objects_cache]
+enabled = true
+`,
+ out: PackObjectsCache{Enabled: true, MaxAge: Duration(5 * time.Minute), Dir: "/foobar/+gitaly/PackObjectsCache"},
+ },
+ {
+ desc: "enabled with custom values",
+ in: storageConfig + `[pack_objects_cache]
+enabled = true
+dir = "/bazqux"
+max_age = "10m"
+`,
+ out: PackObjectsCache{Enabled: true, MaxAge: Duration(10 * time.Minute), Dir: "/bazqux"},
+ },
+ {
+ desc: "enabled with 0 storages",
+ in: `[pack_objects_cache]
+enabled = true
+`,
+ err: errPackObjectsCacheNoStorages,
+ },
+ {
+ desc: "enabled with negative max age",
+ in: `[pack_objects_cache]
+enabled = true
+max_age = "-5m"
+`,
+ err: errPackObjectsCacheNegativeMaxAge,
+ },
+ {
+ desc: "enabled with relative path",
+ in: `[pack_objects_cache]
+enabled = true
+dir = "foobar"
+`,
+ err: errPackObjectsCacheRelativePath,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ cfg, err := Load(strings.NewReader(tc.in))
+ require.NoError(t, err)
+
+ err = cfg.configurePackObjectsCache()
+ if tc.err != nil {
+ require.Equal(t, tc.err, err)
+ return
+ }
+
+ require.NoError(t, err)
+ require.Equal(t, tc.out, cfg.PackObjectsCache)
+ })
+ }
+}
diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go
index fc957ca0d..5ac8cd19c 100644
--- a/internal/gitaly/service/hook/pack_objects_test.go
+++ b/internal/gitaly/service/hook/pack_objects_test.go
@@ -10,6 +10,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/internal/streamcache"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/internal/testhelper/testcfg"
@@ -44,11 +45,20 @@ func TestServer_PackObjectsHook_invalidArgument(t *testing.T) {
}
}
+func cfgWithCache(t *testing.T) (config.Cfg, *gitalypb.Repository, string) {
+ cfg, repo, repoPath := testcfg.BuildWithRepo(t)
+ cfg.PackObjectsCache.Enabled = true
+ var cleanup func()
+ cfg.PackObjectsCache.Dir, cleanup = testhelper.TempDir(t)
+ t.Cleanup(cleanup)
+ return cfg, repo, repoPath
+}
+
func TestServer_PackObjectsHook(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- cfg, repo, repoPath := testcfg.BuildWithRepo(t)
+ cfg, repo, repoPath := cfgWithCache(t)
testCases := []struct {
desc string
@@ -152,7 +162,7 @@ func TestParsePackObjectsArgs(t *testing.T) {
}
func TestServer_PackObjectsHook_separateContext(t *testing.T) {
- cfg, repo, repoPath := testcfg.BuildWithRepo(t)
+ cfg, repo, repoPath := cfgWithCache(t)
startRequest := func(ctx context.Context, stream gitalypb.HookService_PackObjectsHookClient) {
require.NoError(t, stream.Send(&gitalypb.PackObjectsHookRequest{
@@ -214,7 +224,7 @@ func TestServer_PackObjectsHook_separateContext(t *testing.T) {
}
func TestServer_PackObjectsHook_usesCache(t *testing.T) {
- cfg, repo, repoPath := testcfg.BuildWithRepo(t)
+ cfg, repo, repoPath := cfgWithCache(t)
tlc := &streamcache.TestLoggingCache{}
serverSocketPath := runHooksServer(t, cfg, func(s *server) {
diff --git a/internal/gitaly/service/hook/server.go b/internal/gitaly/service/hook/server.go
index 2ef84004b..bc3b739d6 100644
--- a/internal/gitaly/service/hook/server.go
+++ b/internal/gitaly/service/hook/server.go
@@ -1,9 +1,7 @@
package hook
import (
- "path/filepath"
"strconv"
- "time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -12,7 +10,6 @@ import (
gitalyhook "gitlab.com/gitlab-org/gitaly/internal/gitaly/hook"
"gitlab.com/gitlab-org/gitaly/internal/log"
"gitlab.com/gitlab-org/gitaly/internal/streamcache"
- "gitlab.com/gitlab-org/gitaly/internal/tempdir"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
)
@@ -36,42 +33,23 @@ type server struct {
// NewServer creates a new instance of a gRPC namespace server
func NewServer(cfg config.Cfg, manager gitalyhook.Manager, gitCmdFactory git.CommandFactory) gitalypb.HookServiceServer {
srv := &server{
- cfg: cfg,
- manager: manager,
- gitCmdFactory: gitCmdFactory,
+ cfg: cfg,
+ manager: manager,
+ gitCmdFactory: gitCmdFactory,
+ packObjectsCache: streamcache.NullCache{},
}
- if len(cfg.Storages) > 0 {
- // TODO make the cache configurable via config.toml.
- // https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/921
- //
- // Unless someone enables the gitaly_upload_pack_gitaly_hooks feature
- // flag, PackObjectsHook is never called, and srv.packObjectsCache is
- // never accessed.
- //
- // While we are still evaluating the design of the cache, we do not want
- // to commit to a configuration "interface" yet.
-
- // On gitlab.com, all storages point to the same directory so it does not
- // matter which one we pick. Our current plan is to store cache data on
- // the same filesystem used for persistent repository storage. Also see
- // https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/792 for
- // discussion.
- dir := filepath.Join(tempdir.StreamCacheDir(cfg.Storages[0]), "PackObjectsHook")
-
- // 5 minutes appears to be a reasonable number for deduplicating CI clone
- // waves. See
- // https://gitlab.com/gitlab-com/gl-infra/scalability/-/issues/872 for
- // discussion.
- maxAge := 5 * time.Minute
-
- logger := log.Default()
- if cache, err := streamcache.New(dir, maxAge, logger); err != nil {
- logger.WithError(err).Error("instantiate PackObjectsHook cache")
- } else {
- srv.packObjectsCache = cache
- packObjectsCacheEnabled.WithLabelValues(dir, strconv.Itoa(int(maxAge.Seconds()))).Set(1)
- }
+ if poc := cfg.PackObjectsCache; poc.Enabled {
+ maxAge := poc.MaxAge.Duration()
+ srv.packObjectsCache = streamcache.New(
+ poc.Dir,
+ maxAge,
+ log.Default(),
+ )
+ packObjectsCacheEnabled.WithLabelValues(
+ poc.Dir,
+ strconv.Itoa(int(maxAge.Seconds())),
+ ).Set(1)
}
return srv
diff --git a/internal/gitaly/service/hook/server_test.go b/internal/gitaly/service/hook/server_test.go
new file mode 100644
index 000000000..e4009a56e
--- /dev/null
+++ b/internal/gitaly/service/hook/server_test.go
@@ -0,0 +1,53 @@
+package hook
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/git"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/config"
+ gitalyhook "gitlab.com/gitlab-org/gitaly/internal/gitaly/hook"
+ "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction"
+ "gitlab.com/gitlab-org/gitaly/internal/streamcache"
+)
+
+func isNullCache(c streamcache.Cache) bool {
+ _, ok := c.(streamcache.NullCache)
+ return ok
+}
+
+func TestNewServer(t *testing.T) {
+ testCases := []struct {
+ desc string
+ cfg config.Cfg
+ nullCache bool
+ }{
+ {
+ desc: "cache disabled",
+ nullCache: true,
+ },
+ {
+ desc: "cache enabled",
+ cfg: config.Cfg{
+ PackObjectsCache: config.PackObjectsCache{
+ Enabled: true,
+ },
+ },
+ nullCache: false,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ cfg := tc.cfg
+ poc := NewServer(
+ cfg,
+ gitalyhook.NewManager(config.NewLocator(cfg), transaction.NewManager(cfg), gitalyhook.GitlabAPIStub, cfg),
+ git.NewExecCommandFactory(cfg),
+ ).(*server).packObjectsCache
+
+ require.NotNil(t, poc)
+ require.Equal(t, tc.nullCache, isNullCache(poc))
+ })
+ }
+}
diff --git a/internal/streamcache/cache.go b/internal/streamcache/cache.go
index a7518b58e..dfa724a33 100644
--- a/internal/streamcache/cache.go
+++ b/internal/streamcache/cache.go
@@ -96,6 +96,24 @@ func (tlc *TestLoggingCache) Entries() []*TestLogEntry {
return tlc.entries
}
+var _ = Cache(NullCache{})
+
+// NullCache is a null implementation of Cache. Every lookup is a miss,
+// and it uses no storage.
+type NullCache struct{}
+
+// FindOrCreate runs create in a goroutine and lets the caller consume
+// the result via the returned stream. The created flag is always true.
+func (NullCache) FindOrCreate(key string, create func(io.Writer) error) (s *Stream, created bool, err error) {
+ pr, pw := io.Pipe()
+ w := newWaiter()
+ go func() { w.SetError(runCreate(pw, create)) }()
+ return &Stream{reader: pr, waiter: w}, true, nil
+}
+
+// Stop is a no-op.
+func (NullCache) Stop() {}
+
type cache struct {
m sync.Mutex
maxAge time.Duration
@@ -108,15 +126,12 @@ type cache struct {
}
// New returns a new cache instance.
-func New(dir string, maxAge time.Duration, logger logrus.FieldLogger) (Cache, error) {
+func New(dir string, maxAge time.Duration, logger logrus.FieldLogger) Cache {
return newCacheWithSleep(dir, maxAge, time.Sleep, logger)
}
-func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) (Cache, error) {
- fs, err := newFilestore(dir, maxAge, sleep, logger)
- if err != nil {
- return nil, err
- }
+func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) Cache {
+ fs := newFilestore(dir, maxAge, sleep, logger)
c := &cache{
maxAge: maxAge,
@@ -135,7 +150,7 @@ func newCacheWithSleep(dir string, maxAge time.Duration, sleep func(time.Duratio
fs.Stop()
}()
- return c, nil
+ return c
}
func (c *cache) Stop() {
diff --git a/internal/streamcache/cache_test.go b/internal/streamcache/cache_test.go
index 90fa893e7..ac64955c8 100644
--- a/internal/streamcache/cache_test.go
+++ b/internal/streamcache/cache_test.go
@@ -1,6 +1,7 @@
package streamcache
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -23,8 +24,7 @@ func TestCache_writeOneReadMultiple(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Minute, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Minute, log.Default())
defer c.Stop()
const (
@@ -55,8 +55,7 @@ func TestCache_manyConcurrentWrites(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Minute, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Minute, log.Default())
defer c.Stop()
const (
@@ -136,8 +135,7 @@ func TestCache_deletedFile(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Hour, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Hour, log.Default())
defer c.Stop()
const (
@@ -189,8 +187,7 @@ func TestCache_scope(t *testing.T) {
for i := 0; i < N; i++ {
input[i] = fmt.Sprintf("test content %d", i)
- cache[i], err = New(tmp, time.Minute, log.Default())
- require.NoError(t, err)
+ cache[i] = New(tmp, time.Minute, log.Default())
defer func(i int) { cache[i].Stop() }(i)
var created bool
@@ -258,9 +255,7 @@ func TestCache_diskCleanup(t *testing.T) {
)
cl := newClock()
- c, err := newCacheWithSleep(tmp, 0, func(time.Duration) { cl.wait() }, log.Default())
-
- require.NoError(t, err)
+ c := newCacheWithSleep(tmp, 0, func(time.Duration) { cl.wait() }, log.Default())
defer c.Stop()
content := func(i int) string { return fmt.Sprintf("content %d", i) }
@@ -305,8 +300,7 @@ func TestCache_failedWrite(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Hour, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Hour, log.Default())
defer c.Stop()
testCases := []struct {
@@ -354,14 +348,13 @@ func TestCache_failCreateFile(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Hour, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Hour, log.Default())
defer c.Stop()
createError := errors.New("cannot create file")
c.(*cache).createFile = func() (namedWriteCloser, error) { return nil, createError }
- _, _, err = c.FindOrCreate("key", func(io.Writer) error { return nil })
+ _, _, err := c.FindOrCreate("key", func(io.Writer) error { return nil })
require.Equal(t, createError, err)
}
@@ -369,8 +362,7 @@ func TestCache_unWriteableFile(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Hour, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Hour, log.Default())
defer c.Stop()
c.(*cache).createFile = func() (namedWriteCloser, error) {
@@ -396,8 +388,7 @@ func TestCache_unCloseableFile(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Hour, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Hour, log.Default())
defer c.Stop()
c.(*cache).createFile = func() (namedWriteCloser, error) {
@@ -424,8 +415,7 @@ func TestCache_cannotOpenFileForReading(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- c, err := New(tmp, time.Hour, log.Default())
- require.NoError(t, err)
+ c := New(tmp, time.Hour, log.Default())
defer c.Stop()
c.(*cache).createFile = func() (namedWriteCloser, error) {
@@ -436,7 +426,7 @@ func TestCache_cannotOpenFileForReading(t *testing.T) {
return f, os.Remove(f.Name()) // Removed so cannot be opened
}
- _, _, err = c.FindOrCreate("key", func(w io.Writer) error { return nil })
+ _, _, err := c.FindOrCreate("key", func(w io.Writer) error { return nil })
err = errors.Unwrap(err)
require.IsType(t, &os.PathError{}, err)
require.Equal(t, "open", err.(*os.PathError).Op)
@@ -458,3 +448,68 @@ func TestWaiter_cancel(t *testing.T) {
cancel()
require.Equal(t, context.Canceled, <-errc)
}
+
+func TestNullCache(t *testing.T) {
+ const (
+ N = 1000
+ inputSize = 4096
+ key = "key"
+ )
+
+ c := NullCache{}
+ start := make(chan struct{})
+ results := make(chan error, N)
+
+ for i := 0; i < N; i++ {
+ go func() {
+ results <- func() error {
+ input := make([]byte, inputSize)
+ n, err := rand.Read(input)
+ if err != nil {
+ return err
+ }
+ if n != inputSize {
+ return io.ErrShortWrite
+ }
+
+ <-start
+
+ s, created, err := c.FindOrCreate(key, func(w io.Writer) error {
+ for j := 0; j < len(input); j++ {
+ n, err := w.Write(input[j : j+1])
+ if err != nil {
+ return err
+ }
+ if n != 1 {
+ return io.ErrShortWrite
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ defer s.Close()
+
+ if !created {
+ return errors.New("created should be true")
+ }
+
+ output, err := ioutil.ReadAll(s)
+ if err != nil {
+ return err
+ }
+ if !bytes.Equal(output, input) {
+ return errors.New("output does not match input")
+ }
+
+ return s.Wait(context.Background())
+ }()
+ }()
+ }
+
+ close(start)
+ for i := 0; i < N; i++ {
+ require.NoError(t, <-results)
+ }
+}
diff --git a/internal/streamcache/filestore.go b/internal/streamcache/filestore.go
index e93d81f0f..69072270e 100644
--- a/internal/streamcache/filestore.go
+++ b/internal/streamcache/filestore.go
@@ -52,22 +52,16 @@ type filestore struct {
dir string
maxAge time.Duration
- id []byte
- counter
- stop chan struct{}
- stopOnce sync.Once
+ m sync.Mutex
+ id []byte
+ counter uint64
+ stop chan struct{}
}
-func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) (*filestore, error) {
- buf := make([]byte, 10)
- if _, err := io.ReadFull(rand.Reader, buf); err != nil {
- return nil, err
- }
-
+func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration), logger logrus.FieldLogger) *filestore {
fs := &filestore{
dir: dir,
maxAge: maxAge,
- id: buf,
stop: make(chan struct{}),
}
@@ -81,7 +75,7 @@ func newFilestore(dir string, maxAge time.Duration, sleep func(time.Duration), l
})
})
- return fs, nil
+ return fs
}
type namedWriteCloser interface {
@@ -94,7 +88,11 @@ type namedWriteCloser interface {
// names after a file has been deleted. By using a very large (uint64)
// counter, Create makes it clear / explicit how unlikely reuse is.
func (fs *filestore) Create() (namedWriteCloser, error) {
- fileID := fs.counter.nextValue()
+ if err := fs.ensureCacheID(); err != nil {
+ return nil, err
+ }
+
+ fileID := fs.nextFileID()
name := fmt.Sprintf("%x-%d",
// fs.id ensures uniqueness among other *filestore instances
@@ -117,7 +115,38 @@ func (fs *filestore) Create() (namedWriteCloser, error) {
return f, nil
}
-func (fs *filestore) Stop() { fs.stopOnce.Do(func() { close(fs.stop) }) }
+func (fs *filestore) ensureCacheID() error {
+ fs.m.Lock()
+ defer fs.m.Unlock()
+
+ if len(fs.id) == 0 {
+ buf := make([]byte, 10)
+ if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+ return err
+ }
+ fs.id = buf
+ }
+
+ return nil
+}
+
+func (fs *filestore) nextFileID() uint64 {
+ fs.m.Lock()
+ defer fs.m.Unlock()
+ fs.counter++
+ return fs.counter
+}
+
+func (fs *filestore) Stop() {
+ fs.m.Lock()
+ defer fs.m.Unlock()
+
+ select {
+ case <-fs.stop:
+ default:
+ close(fs.stop)
+ }
+}
// cleanWalk removes files but not directories. This is to avoid races
// when a directory looks empty but another goroutine is about to create
@@ -155,16 +184,3 @@ func (fs *filestore) diskUsage() float64 {
})
return total
}
-
-type counter struct {
- n uint64
- sync.Mutex
-}
-
-func (c *counter) nextValue() uint64 {
- c.Lock()
- defer c.Unlock()
- v := c.n
- c.n++
- return v
-}
diff --git a/internal/streamcache/filestore_test.go b/internal/streamcache/filestore_test.go
index 0c165f40c..efe1a245a 100644
--- a/internal/streamcache/filestore_test.go
+++ b/internal/streamcache/filestore_test.go
@@ -18,8 +18,7 @@ func TestFilestoreCreate(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- fs, err := newFilestore(tmp, 0, time.Sleep, log.Default())
- require.NoError(t, err)
+ fs := newFilestore(tmp, 0, time.Sleep, log.Default())
defer fs.Stop()
f, err := fs.Create()
@@ -43,8 +42,7 @@ func TestFilestoreCreate_concurrency(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- fs, err := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
- require.NoError(t, err)
+ fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
defer fs.Stop()
const N = 100
@@ -87,8 +85,7 @@ func TestFilestoreCreate_uniqueness(t *testing.T) {
filenames := make(map[string]struct{})
for j := 0; j < M; j++ {
- fs, err := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
- require.NoError(t, err)
+ fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
defer fs.Stop()
for i := 0; i < N; i++ {
@@ -109,8 +106,7 @@ func TestFilestoreCleanwalk(t *testing.T) {
tmp, cleanTmp := testhelper.TempDir(t)
defer cleanTmp()
- fs, err := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
- require.NoError(t, err)
+ fs := newFilestore(tmp, time.Hour, time.Sleep, log.Default())
defer fs.Stop()
dir1 := filepath.Join(tmp, "dir1")
diff --git a/internal/tempdir/tempdir.go b/internal/tempdir/tempdir.go
index 6d68134d9..eb5e26ad5 100644
--- a/internal/tempdir/tempdir.go
+++ b/internal/tempdir/tempdir.go
@@ -22,7 +22,7 @@ const (
// (non-user) data. We need to be careful that this path does not clash
// with any directory name that could be provided by a user. The '+'
// character is not allowed in GitLab namespaces or repositories.
- GitalyDataPrefix = "+gitaly"
+ GitalyDataPrefix = config.GitalyDataPrefix
// tmpRootPrefix is the directory in which we store temporary
// directories.
@@ -36,8 +36,6 @@ const (
// storage location.
statePrefix = GitalyDataPrefix + "/state"
- streamCachePrefix = GitalyDataPrefix + "/streamcache"
-
// MaxAge is used by ForDeleteAllRepositories. It is also a fallback
// for the context-scoped temporary directories, to ensure they get
// cleaned up if the cleanup at the end of the context failed to run.
@@ -65,11 +63,6 @@ func TempDir(storage config.Storage) string { return AppendTempDir(storage.Path)
// provided
func AppendTempDir(storagePath string) string { return filepath.Join(storagePath, tmpRootPrefix) }
-// StreamCacheDir returns the streamcache directory for a storage location
-func StreamCacheDir(storage config.Storage) string {
- return filepath.Join(storage.Path, streamCachePrefix)
-}
-
// ForDeleteAllRepositories returns a temporary directory for the given storage. It is not context-scoped but it will get removed eventuall (after MaxAge).
func ForDeleteAllRepositories(locator storage.Locator, storageName string) (string, error) {
prefix := fmt.Sprintf("%s-repositories.old.%d.", storageName, time.Now().Unix())