diff options
author | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-02-03 11:04:59 +0300 |
---|---|---|
committer | Patrick Steinhardt <psteinhardt@gitlab.com> | 2022-02-03 11:04:59 +0300 |
commit | 2634a6d7077cd68a3f141ed5560162b6e696e9c9 (patch) | |
tree | 0ba851e72a671545fb2b1004be19f4697b1b084c | |
parent | 88d32fa48e5a9b0eca0800331f7d0035a3cf7e4f (diff) | |
parent | b1859840bf21099c8753777abc3044dd5c6284b6 (diff) |
Merge branch 'sh-backport-gitaly-lfs-smudge-fix-14-4' into '14-4-stable'
gitaly-lfs-smudge: Fix missing close for HTTP body (14.4)
See merge request gitlab-org/gitaly!4243
-rw-r--r-- | cmd/gitaly-lfs-smudge/lfs_smudge.go | 35 | ||||
-rw-r--r-- | internal/git/catfile/object_reader.go | 5 | ||||
-rw-r--r-- | internal/helper/ticker.go | 11 | ||||
-rw-r--r-- | internal/supervisor/monitor.go | 15 | ||||
-rw-r--r-- | internal/supervisor/supervisor.go | 24 |
5 files changed, 71 insertions, 19 deletions
diff --git a/cmd/gitaly-lfs-smudge/lfs_smudge.go b/cmd/gitaly-lfs-smudge/lfs_smudge.go index e41e04e57..108706104 100644 --- a/cmd/gitaly-lfs-smudge/lfs_smudge.go +++ b/cmd/gitaly-lfs-smudge/lfs_smudge.go @@ -38,11 +38,22 @@ func initLogging(p configProvider) (io.Closer, error) { } func smudge(to io.Writer, from io.Reader, cfgProvider configProvider) error { - output, err := handleSmudge(to, from, cfgProvider) + // Since the environment is sanitized at the moment, we're only + // using this to extract the correlation ID. The finished() call + // to clean up the tracing will be a NOP here. + ctx, finished := tracing.ExtractFromEnv(context.Background()) + defer finished() + + output, err := handleSmudge(ctx, to, from, cfgProvider) if err != nil { log.WithError(err).Error(err) return err } + defer func() { + if err := output.Close(); err != nil { + log.ContextLogger(ctx).WithError(err).Error("closing LFS object: %w", err) + } + }() _, copyErr := io.Copy(to, output) if copyErr != nil { @@ -53,26 +64,20 @@ func smudge(to io.Writer, from io.Reader, cfgProvider configProvider) error { return nil } -func handleSmudge(to io.Writer, from io.Reader, config configProvider) (io.Reader, error) { - // Since the environment is sanitized at the moment, we're only - // using this to extract the correlation ID. The finished() call - // to clean up the tracing will be a NOP here. - ctx, finished := tracing.ExtractFromEnv(context.Background()) - defer finished() - +func handleSmudge(ctx context.Context, to io.Writer, from io.Reader, config configProvider) (io.ReadCloser, error) { logger := log.ContextLogger(ctx) ptr, contents, err := lfs.DecodeFrom(from) if err != nil { // This isn't a valid LFS pointer. Just copy the existing pointer data. - return contents, nil + return io.NopCloser(contents), nil } logger.WithField("oid", ptr.Oid).Debug("decoded LFS OID") glCfg, tlsCfg, glRepository, err := loadConfig(config) if err != nil { - return contents, err + return io.NopCloser(contents), err } logger.WithField("gitlab_config", glCfg). @@ -81,7 +86,7 @@ func handleSmudge(to io.Writer, from io.Reader, config configProvider) (io.Reade client, err := gitlab.NewHTTPClient(logger, glCfg, tlsCfg, prometheus.Config{}) if err != nil { - return contents, err + return io.NopCloser(contents), err } qs := url.Values{} @@ -91,14 +96,18 @@ func handleSmudge(to io.Writer, from io.Reader, config configProvider) (io.Reade response, err := client.Get(ctx, u.String()) if err != nil { - return contents, fmt.Errorf("error loading LFS object: %v", err) + return io.NopCloser(contents), fmt.Errorf("error loading LFS object: %v", err) } if response.StatusCode == 200 { return response.Body, nil } - return contents, nil + if err := response.Body.Close(); err != nil { + logger.WithError(err).Error("closing LFS pointer body: %w", err) + } + + return io.NopCloser(contents), nil } func loadConfig(cfgProvider configProvider) (config.Gitlab, config.TLS, string, error) { diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go index a87a118b1..841010758 100644 --- a/internal/git/catfile/object_reader.go +++ b/internal/git/catfile/object_reader.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "os" "sync" "github.com/opentracing/opentracing-go" @@ -178,6 +179,10 @@ func (o *objectDataReader) Read(p []byte) (int, error) { o.objectReader.Lock() defer o.objectReader.Unlock() + if o.closed { + return 0, os.ErrClosed + } + n, err := o.r.Read(p) o.objectReader.consume(n) return n, err diff --git a/internal/helper/ticker.go b/internal/helper/ticker.go index e909a6c60..1d10981dc 100644 --- a/internal/helper/ticker.go +++ b/internal/helper/ticker.go @@ -26,7 +26,16 @@ type timerTicker struct { func (tt *timerTicker) C() <-chan time.Time { return tt.timer.C } -func (tt *timerTicker) Reset() { tt.timer.Reset(tt.interval) } +// Reset resets the timer. If there is a pending tick, then this tick will be drained. +func (tt *timerTicker) Reset() { + if !tt.timer.Stop() { + select { + case <-tt.timer.C: + default: + } + } + tt.timer.Reset(tt.interval) +} func (tt *timerTicker) Stop() { tt.timer.Stop() } diff --git a/internal/supervisor/monitor.go b/internal/supervisor/monitor.go index 2e6a3dc71..fcf6d3dcc 100644 --- a/internal/supervisor/monitor.go +++ b/internal/supervisor/monitor.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/ps" ) @@ -34,12 +35,16 @@ type monitorProcess struct { func monitorRss(procs <-chan monitorProcess, done chan<- struct{}, events chan<- Event, name string, threshold int) { log.WithField("supervisor.name", name).WithField("supervisor.rss_threshold", threshold).Info("starting RSS monitor") - t := time.NewTicker(15 * time.Second) + t := helper.NewTimerTicker(15 * time.Second) defer t.Stop() defer close(done) for mp := range procs { + // There is no need for the ticker to run on first iteration given that we'd reset + // it anyway. + t.Stop() + monitorLoop: for { rss, err := ps.RSS(mp.pid) @@ -64,10 +69,16 @@ func monitorRss(procs <-chan monitorProcess, done chan<- struct{}, events chan<- } } + // Reset the timer such that we do the next check after 15 seconds. + // Otherwise, the first two loops for the current process may trigger in + // quick succession given that we may have waited some time to wait for the + // process during which the ticker would have already counted down. + t.Reset() + select { case <-mp.wait: break monitorLoop - case <-t.C: + case <-t.C(): } } } diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go index f3a3dc0ee..3efdc05d4 100644 --- a/internal/supervisor/supervisor.go +++ b/internal/supervisor/supervisor.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/labkit/tracing" ) @@ -125,6 +126,12 @@ func watch(p *Process) { go monitorHealth(p.healthCheck, p.events, p.Name, healthShutdown) } + notificationTicker := helper.NewTimerTicker(1 * time.Minute) + defer notificationTicker.Stop() + + crashResetTicker := helper.NewTimerTicker(p.config.CrashResetTime) + defer crashResetTicker.Stop() + spawnLoop: for { if crashes >= p.config.CrashThreshold { @@ -166,14 +173,25 @@ spawnLoop: monitorChan <- monitorProcess{pid: pid, wait: waitCh} + // We create the tickers before the spawn loop so we don't recreate the channels + // every time we loop. Furthermore, stopping those tickers via deferred function + // calls would only clean them up after we stop watching. So instead, we just reset + // both timers here. + notificationTicker.Reset() + crashResetTicker.Reset() + waitLoop: for { select { - case <-time.After(1 * time.Minute): + case <-notificationTicker.C(): + go p.notifyUp(pid) + // We repeat this idempotent notification because its delivery is not // guaranteed. - go p.notifyUp(pid) - case <-time.After(p.config.CrashResetTime): + notificationTicker.Reset() + case <-crashResetTicker.C(): + // We do not reset the crash reset ticker because we only need to + // reset crashes once. crashes = 0 case <-waitCh: crashes++ |