Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Steinhardt <psteinhardt@gitlab.com>2022-02-03 11:04:59 +0300
committerPatrick Steinhardt <psteinhardt@gitlab.com>2022-02-03 11:04:59 +0300
commit2634a6d7077cd68a3f141ed5560162b6e696e9c9 (patch)
tree0ba851e72a671545fb2b1004be19f4697b1b084c
parent88d32fa48e5a9b0eca0800331f7d0035a3cf7e4f (diff)
parentb1859840bf21099c8753777abc3044dd5c6284b6 (diff)
Merge branch 'sh-backport-gitaly-lfs-smudge-fix-14-4' into '14-4-stable'
gitaly-lfs-smudge: Fix missing close for HTTP body (14.4) See merge request gitlab-org/gitaly!4243
-rw-r--r--cmd/gitaly-lfs-smudge/lfs_smudge.go35
-rw-r--r--internal/git/catfile/object_reader.go5
-rw-r--r--internal/helper/ticker.go11
-rw-r--r--internal/supervisor/monitor.go15
-rw-r--r--internal/supervisor/supervisor.go24
5 files changed, 71 insertions, 19 deletions
diff --git a/cmd/gitaly-lfs-smudge/lfs_smudge.go b/cmd/gitaly-lfs-smudge/lfs_smudge.go
index e41e04e57..108706104 100644
--- a/cmd/gitaly-lfs-smudge/lfs_smudge.go
+++ b/cmd/gitaly-lfs-smudge/lfs_smudge.go
@@ -38,11 +38,22 @@ func initLogging(p configProvider) (io.Closer, error) {
}
func smudge(to io.Writer, from io.Reader, cfgProvider configProvider) error {
- output, err := handleSmudge(to, from, cfgProvider)
+ // Since the environment is sanitized at the moment, we're only
+ // using this to extract the correlation ID. The finished() call
+ // to clean up the tracing will be a NOP here.
+ ctx, finished := tracing.ExtractFromEnv(context.Background())
+ defer finished()
+
+ output, err := handleSmudge(ctx, to, from, cfgProvider)
if err != nil {
log.WithError(err).Error(err)
return err
}
+ defer func() {
+ if err := output.Close(); err != nil {
+ log.ContextLogger(ctx).WithError(err).Error("closing LFS object: %w", err)
+ }
+ }()
_, copyErr := io.Copy(to, output)
if copyErr != nil {
@@ -53,26 +64,20 @@ func smudge(to io.Writer, from io.Reader, cfgProvider configProvider) error {
return nil
}
-func handleSmudge(to io.Writer, from io.Reader, config configProvider) (io.Reader, error) {
- // Since the environment is sanitized at the moment, we're only
- // using this to extract the correlation ID. The finished() call
- // to clean up the tracing will be a NOP here.
- ctx, finished := tracing.ExtractFromEnv(context.Background())
- defer finished()
-
+func handleSmudge(ctx context.Context, to io.Writer, from io.Reader, config configProvider) (io.ReadCloser, error) {
logger := log.ContextLogger(ctx)
ptr, contents, err := lfs.DecodeFrom(from)
if err != nil {
// This isn't a valid LFS pointer. Just copy the existing pointer data.
- return contents, nil
+ return io.NopCloser(contents), nil
}
logger.WithField("oid", ptr.Oid).Debug("decoded LFS OID")
glCfg, tlsCfg, glRepository, err := loadConfig(config)
if err != nil {
- return contents, err
+ return io.NopCloser(contents), err
}
logger.WithField("gitlab_config", glCfg).
@@ -81,7 +86,7 @@ func handleSmudge(to io.Writer, from io.Reader, config configProvider) (io.Reade
client, err := gitlab.NewHTTPClient(logger, glCfg, tlsCfg, prometheus.Config{})
if err != nil {
- return contents, err
+ return io.NopCloser(contents), err
}
qs := url.Values{}
@@ -91,14 +96,18 @@ func handleSmudge(to io.Writer, from io.Reader, config configProvider) (io.Reade
response, err := client.Get(ctx, u.String())
if err != nil {
- return contents, fmt.Errorf("error loading LFS object: %v", err)
+ return io.NopCloser(contents), fmt.Errorf("error loading LFS object: %v", err)
}
if response.StatusCode == 200 {
return response.Body, nil
}
- return contents, nil
+ if err := response.Body.Close(); err != nil {
+ logger.WithError(err).Error("closing LFS pointer body: %w", err)
+ }
+
+ return io.NopCloser(contents), nil
}
func loadConfig(cfgProvider configProvider) (config.Gitlab, config.TLS, string, error) {
diff --git a/internal/git/catfile/object_reader.go b/internal/git/catfile/object_reader.go
index a87a118b1..841010758 100644
--- a/internal/git/catfile/object_reader.go
+++ b/internal/git/catfile/object_reader.go
@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
+ "os"
"sync"
"github.com/opentracing/opentracing-go"
@@ -178,6 +179,10 @@ func (o *objectDataReader) Read(p []byte) (int, error) {
o.objectReader.Lock()
defer o.objectReader.Unlock()
+ if o.closed {
+ return 0, os.ErrClosed
+ }
+
n, err := o.r.Read(p)
o.objectReader.consume(n)
return n, err
diff --git a/internal/helper/ticker.go b/internal/helper/ticker.go
index e909a6c60..1d10981dc 100644
--- a/internal/helper/ticker.go
+++ b/internal/helper/ticker.go
@@ -26,7 +26,16 @@ type timerTicker struct {
func (tt *timerTicker) C() <-chan time.Time { return tt.timer.C }
-func (tt *timerTicker) Reset() { tt.timer.Reset(tt.interval) }
+// Reset resets the timer. If there is a pending tick, then this tick will be drained.
+func (tt *timerTicker) Reset() {
+ if !tt.timer.Stop() {
+ select {
+ case <-tt.timer.C:
+ default:
+ }
+ }
+ tt.timer.Reset(tt.interval)
+}
func (tt *timerTicker) Stop() { tt.timer.Stop() }
diff --git a/internal/supervisor/monitor.go b/internal/supervisor/monitor.go
index 2e6a3dc71..fcf6d3dcc 100644
--- a/internal/supervisor/monitor.go
+++ b/internal/supervisor/monitor.go
@@ -6,6 +6,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/gitaly/v14/internal/ps"
)
@@ -34,12 +35,16 @@ type monitorProcess struct {
func monitorRss(procs <-chan monitorProcess, done chan<- struct{}, events chan<- Event, name string, threshold int) {
log.WithField("supervisor.name", name).WithField("supervisor.rss_threshold", threshold).Info("starting RSS monitor")
- t := time.NewTicker(15 * time.Second)
+ t := helper.NewTimerTicker(15 * time.Second)
defer t.Stop()
defer close(done)
for mp := range procs {
+ // There is no need for the ticker to run on first iteration given that we'd reset
+ // it anyway.
+ t.Stop()
+
monitorLoop:
for {
rss, err := ps.RSS(mp.pid)
@@ -64,10 +69,16 @@ func monitorRss(procs <-chan monitorProcess, done chan<- struct{}, events chan<-
}
}
+ // Reset the timer such that we do the next check after 15 seconds.
+ // Otherwise, the first two loops for the current process may trigger in
+ // quick succession given that we may have waited some time to wait for the
+ // process during which the ticker would have already counted down.
+ t.Reset()
+
select {
case <-mp.wait:
break monitorLoop
- case <-t.C:
+ case <-t.C():
}
}
}
diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go
index f3a3dc0ee..3efdc05d4 100644
--- a/internal/supervisor/supervisor.go
+++ b/internal/supervisor/supervisor.go
@@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
"gitlab.com/gitlab-org/labkit/tracing"
)
@@ -125,6 +126,12 @@ func watch(p *Process) {
go monitorHealth(p.healthCheck, p.events, p.Name, healthShutdown)
}
+ notificationTicker := helper.NewTimerTicker(1 * time.Minute)
+ defer notificationTicker.Stop()
+
+ crashResetTicker := helper.NewTimerTicker(p.config.CrashResetTime)
+ defer crashResetTicker.Stop()
+
spawnLoop:
for {
if crashes >= p.config.CrashThreshold {
@@ -166,14 +173,25 @@ spawnLoop:
monitorChan <- monitorProcess{pid: pid, wait: waitCh}
+ // We create the tickers before the spawn loop so we don't recreate the channels
+ // every time we loop. Furthermore, stopping those tickers via deferred function
+ // calls would only clean them up after we stop watching. So instead, we just reset
+ // both timers here.
+ notificationTicker.Reset()
+ crashResetTicker.Reset()
+
waitLoop:
for {
select {
- case <-time.After(1 * time.Minute):
+ case <-notificationTicker.C():
+ go p.notifyUp(pid)
+
// We repeat this idempotent notification because its delivery is not
// guaranteed.
- go p.notifyUp(pid)
- case <-time.After(p.config.CrashResetTime):
+ notificationTicker.Reset()
+ case <-crashResetTicker.C():
+ // We do not reset the crash reset ticker because we only need to
+ // reset crashes once.
crashes = 0
case <-waitCh:
crashes++