Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal/imageresizer/image_resizer.go')
-rw-r--r--workhorse/internal/imageresizer/image_resizer.go449
1 files changed, 449 insertions, 0 deletions
diff --git a/workhorse/internal/imageresizer/image_resizer.go b/workhorse/internal/imageresizer/image_resizer.go
new file mode 100644
index 00000000000..77318ed1c46
--- /dev/null
+++ b/workhorse/internal/imageresizer/image_resizer.go
@@ -0,0 +1,449 @@
+package imageresizer
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promauto"
+
+ "gitlab.com/gitlab-org/labkit/correlation"
+ "gitlab.com/gitlab-org/labkit/log"
+ "gitlab.com/gitlab-org/labkit/mask"
+ "gitlab.com/gitlab-org/labkit/tracing"
+
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
+ "gitlab.com/gitlab-org/gitlab-workhorse/internal/senddata"
+)
+
+type Resizer struct {
+ config.Config
+ senddata.Prefix
+ numScalerProcs processCounter
+}
+
+type resizeParams struct {
+ Location string
+ ContentType string
+ Width uint
+}
+
+type processCounter struct {
+ n int32
+}
+
+type resizeStatus = string
+
+type imageFile struct {
+ reader io.ReadCloser
+ contentLength int64
+ lastModified time.Time
+}
+
+// Carries information about how the scaler succeeded or failed.
+type resizeOutcome struct {
+ bytesWritten int64
+ originalFileSize int64
+ status resizeStatus
+ err error
+}
+
+const (
+ statusSuccess = "success" // a rescaled image was served
+ statusClientCache = "success-client-cache" // scaling was skipped because client cache was fresh
+ statusServedOriginal = "served-original" // scaling failed but the original image was served
+ statusRequestFailure = "request-failed" // no image was served
+ statusUnknown = "unknown" // indicates an unhandled status case
+)
+
+var envInjector = tracing.NewEnvInjector()
+
+// Images might be located remotely in object storage, in which case we need to stream
+// it via http(s)
+var httpTransport = tracing.NewRoundTripper(correlation.NewInstrumentedRoundTripper(&http.Transport{
+ Proxy: http.ProxyFromEnvironment,
+ DialContext: (&net.Dialer{
+ Timeout: 30 * time.Second,
+ KeepAlive: 10 * time.Second,
+ }).DialContext,
+ MaxIdleConns: 2,
+ IdleConnTimeout: 30 * time.Second,
+ TLSHandshakeTimeout: 10 * time.Second,
+ ExpectContinueTimeout: 10 * time.Second,
+ ResponseHeaderTimeout: 30 * time.Second,
+}))
+
+var httpClient = &http.Client{
+ Transport: httpTransport,
+}
+
+const (
+ namespace = "gitlab_workhorse"
+ subsystem = "image_resize"
+ logSystem = "imageresizer"
+)
+
+var (
+ imageResizeConcurrencyLimitExceeds = promauto.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "concurrency_limit_exceeds_total",
+ Help: "Amount of image resizing requests that exceeded the maximum allowed scaler processes",
+ },
+ )
+ imageResizeProcesses = promauto.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "processes",
+ Help: "Amount of image scaler processes working now",
+ },
+ )
+ imageResizeMaxProcesses = promauto.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "max_processes",
+ Help: "The maximum amount of image scaler processes allowed to run concurrently",
+ },
+ )
+ imageResizeRequests = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "requests_total",
+ Help: "Image resizing operations requested",
+ },
+ []string{"status"},
+ )
+ imageResizeDurations = promauto.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Subsystem: subsystem,
+ Name: "duration_seconds",
+ Help: "Breakdown of total time spent serving successful image resizing requests (incl. data transfer)",
+ Buckets: []float64{
+ 0.025, /* 25ms */
+ 0.050, /* 50ms */
+ 0.1, /* 100ms */
+ 0.2, /* 200ms */
+ 0.4, /* 400ms */
+ 0.8, /* 800ms */
+ },
+ },
+ []string{"content_type", "width"},
+ )
+)
+
+const (
+ jpegMagic = "\xff\xd8" // 2 bytes
+ pngMagic = "\x89PNG\r\n\x1a\n" // 8 bytes
+ maxMagicLen = 8 // 8 first bytes is enough to detect PNG or JPEG
+)
+
+func NewResizer(cfg config.Config) *Resizer {
+ imageResizeMaxProcesses.Set(float64(cfg.ImageResizerConfig.MaxScalerProcs))
+
+ return &Resizer{Config: cfg, Prefix: "send-scaled-img:"}
+}
+
+// Inject forks into a dedicated scaler process to resize an image identified by path or URL
+// and streams the resized image back to the client
+func (r *Resizer) Inject(w http.ResponseWriter, req *http.Request, paramsData string) {
+ var outcome = resizeOutcome{status: statusUnknown, originalFileSize: 0, bytesWritten: 0}
+ start := time.Now()
+ params, err := r.unpackParameters(paramsData)
+
+ defer func() {
+ imageResizeRequests.WithLabelValues(outcome.status).Inc()
+ handleOutcome(w, req, start, params, &outcome)
+ }()
+
+ if err != nil {
+ // This means the response header coming from Rails was malformed; there is no way
+ // to sensibly recover from this other than failing fast
+ outcome.error(fmt.Errorf("read image resize params: %v", err))
+ return
+ }
+
+ imageFile, err := openSourceImage(params.Location)
+ if err != nil {
+ // This means we cannot even read the input image; fail fast.
+ outcome.error(fmt.Errorf("open image data stream: %v", err))
+ return
+ }
+ defer imageFile.reader.Close()
+
+ outcome.originalFileSize = imageFile.contentLength
+
+ setLastModified(w, imageFile.lastModified)
+ // If the original file has not changed, then any cached resized versions have not changed either.
+ if checkNotModified(req, imageFile.lastModified) {
+ writeNotModified(w)
+ outcome.ok(statusClientCache)
+ return
+ }
+
+ // We first attempt to rescale the image; if this should fail for any reason, imageReader
+ // will point to the original image, i.e. we render it unchanged.
+ imageReader, resizeCmd, err := r.tryResizeImage(req, imageFile, params, r.Config.ImageResizerConfig)
+ if err != nil {
+ // Something failed, but we can still write out the original image, so don't return early.
+ // We need to log this separately since the subsequent steps might add other failures.
+ helper.LogErrorWithFields(req, err, *logFields(start, params, &outcome))
+ }
+ defer helper.CleanUpProcessGroup(resizeCmd)
+
+ w.Header().Del("Content-Length")
+ outcome.bytesWritten, err = serveImage(imageReader, w, resizeCmd)
+
+ // We failed serving image data; this is a hard failure.
+ if err != nil {
+ outcome.error(err)
+ return
+ }
+
+ // This means we served the original image because rescaling failed; this is a soft failure
+ if resizeCmd == nil {
+ outcome.ok(statusServedOriginal)
+ return
+ }
+
+ widthLabelVal := strconv.Itoa(int(params.Width))
+ imageResizeDurations.WithLabelValues(params.ContentType, widthLabelVal).Observe(time.Since(start).Seconds())
+
+ outcome.ok(statusSuccess)
+}
+
+// Streams image data from the given reader to the given writer and returns the number of bytes written.
+func serveImage(r io.Reader, w io.Writer, resizeCmd *exec.Cmd) (int64, error) {
+ bytesWritten, err := io.Copy(w, r)
+ if err != nil {
+ return bytesWritten, err
+ }
+
+ if resizeCmd != nil {
+ // If a scaler process had been forked, wait for the command to finish.
+ if err = resizeCmd.Wait(); err != nil {
+ // err will be an ExitError; this is not useful beyond knowing the exit code since anything
+ // interesting has been written to stderr, so we turn that into an error we can return.
+ stdErr := resizeCmd.Stderr.(*strings.Builder)
+ return bytesWritten, fmt.Errorf(stdErr.String())
+ }
+ }
+
+ return bytesWritten, nil
+}
+
+func (r *Resizer) unpackParameters(paramsData string) (*resizeParams, error) {
+ var params resizeParams
+ if err := r.Unpack(&params, paramsData); err != nil {
+ return nil, err
+ }
+
+ if params.Location == "" {
+ return nil, fmt.Errorf("'Location' not set")
+ }
+
+ if params.ContentType == "" {
+ return nil, fmt.Errorf("'ContentType' must be set")
+ }
+
+ return &params, nil
+}
+
+// Attempts to rescale the given image data, or in case of errors, falls back to the original image.
+func (r *Resizer) tryResizeImage(req *http.Request, f *imageFile, params *resizeParams, cfg config.ImageResizerConfig) (io.Reader, *exec.Cmd, error) {
+ if f.contentLength > int64(cfg.MaxFilesize) {
+ return f.reader, nil, fmt.Errorf("%d bytes exceeds maximum file size of %d bytes", f.contentLength, cfg.MaxFilesize)
+ }
+
+ if f.contentLength < maxMagicLen {
+ return f.reader, nil, fmt.Errorf("file is too small to resize: %d bytes", f.contentLength)
+ }
+
+ if !r.numScalerProcs.tryIncrement(int32(cfg.MaxScalerProcs)) {
+ return f.reader, nil, fmt.Errorf("too many running scaler processes (%d / %d)", r.numScalerProcs.n, cfg.MaxScalerProcs)
+ }
+
+ ctx := req.Context()
+ go func() {
+ <-ctx.Done()
+ r.numScalerProcs.decrement()
+ }()
+
+ // Creating buffered Reader is required for us to Peek into first bytes of the image file to detect the format
+ // without advancing the reader (we need to read from the file start in the Scaler binary).
+ // We set `8` as the minimal buffer size by the length of PNG magic bytes sequence (JPEG needs only 2).
+ // In fact, `NewReaderSize` will immediately override it with `16` using its `minReadBufferSize` -
+ // here we are just being explicit about the buffer size required for our code to operate correctly.
+ // Having a reader with such tiny buffer will not hurt the performance during further operations,
+ // because Golang `bufio.Read` avoids double copy: https://golang.org/src/bufio/bufio.go?s=1768:1804#L212
+ buffered := bufio.NewReaderSize(f.reader, maxMagicLen)
+
+ headerBytes, err := buffered.Peek(maxMagicLen)
+ if err != nil {
+ return buffered, nil, fmt.Errorf("peek stream: %v", err)
+ }
+
+ // Check magic bytes to identify file type.
+ if string(headerBytes) != pngMagic && string(headerBytes[0:2]) != jpegMagic {
+ return buffered, nil, fmt.Errorf("unrecognized file signature: %v", headerBytes)
+ }
+
+ resizeCmd, resizedImageReader, err := startResizeImageCommand(ctx, buffered, params)
+ if err != nil {
+ return buffered, nil, fmt.Errorf("fork into scaler process: %w", err)
+ }
+ return resizedImageReader, resizeCmd, nil
+}
+
+func startResizeImageCommand(ctx context.Context, imageReader io.Reader, params *resizeParams) (*exec.Cmd, io.ReadCloser, error) {
+ cmd := exec.CommandContext(ctx, "gitlab-resize-image")
+ cmd.Stdin = imageReader
+ cmd.Stderr = &strings.Builder{}
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ cmd.Env = []string{
+ "GL_RESIZE_IMAGE_WIDTH=" + strconv.Itoa(int(params.Width)),
+ }
+ cmd.Env = envInjector(ctx, cmd.Env)
+
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if err := cmd.Start(); err != nil {
+ return nil, nil, err
+ }
+
+ return cmd, stdout, nil
+}
+
+func isURL(location string) bool {
+ return strings.HasPrefix(location, "http://") || strings.HasPrefix(location, "https://")
+}
+
+func openSourceImage(location string) (*imageFile, error) {
+ if isURL(location) {
+ return openFromURL(location)
+ }
+
+ return openFromFile(location)
+}
+
+func openFromURL(location string) (*imageFile, error) {
+ res, err := httpClient.Get(location)
+ if err != nil {
+ return nil, err
+ }
+
+ switch res.StatusCode {
+ case http.StatusOK, http.StatusNotModified:
+ // Extract headers for conditional GETs from response.
+ lastModified, err := http.ParseTime(res.Header.Get("Last-Modified"))
+ if err != nil {
+ // This is unlikely to happen, coming from an object storage provider.
+ lastModified = time.Now().UTC()
+ }
+ return &imageFile{res.Body, res.ContentLength, lastModified}, nil
+ default:
+ res.Body.Close()
+ return nil, fmt.Errorf("stream data from %q: %d %s", location, res.StatusCode, res.Status)
+ }
+}
+
+func openFromFile(location string) (*imageFile, error) {
+ file, err := os.Open(location)
+ if err != nil {
+ return nil, err
+ }
+
+ fi, err := file.Stat()
+ if err != nil {
+ file.Close()
+ return nil, err
+ }
+
+ return &imageFile{file, fi.Size(), fi.ModTime()}, nil
+}
+
+// Only allow more scaling requests if we haven't yet reached the maximum
+// allowed number of concurrent scaler processes
+func (c *processCounter) tryIncrement(maxScalerProcs int32) bool {
+ if p := atomic.AddInt32(&c.n, 1); p > maxScalerProcs {
+ c.decrement()
+ imageResizeConcurrencyLimitExceeds.Inc()
+
+ return false
+ }
+
+ imageResizeProcesses.Set(float64(c.n))
+ return true
+}
+
+func (c *processCounter) decrement() {
+ atomic.AddInt32(&c.n, -1)
+ imageResizeProcesses.Set(float64(c.n))
+}
+
+func (o *resizeOutcome) ok(status resizeStatus) {
+ o.status = status
+ o.err = nil
+}
+
+func (o *resizeOutcome) error(err error) {
+ o.status = statusRequestFailure
+ o.err = err
+}
+
+func logFields(startTime time.Time, params *resizeParams, outcome *resizeOutcome) *log.Fields {
+ var targetWidth, contentType string
+ if params != nil {
+ targetWidth = fmt.Sprint(params.Width)
+ contentType = fmt.Sprint(params.ContentType)
+ }
+ return &log.Fields{
+ "subsystem": logSystem,
+ "written_bytes": outcome.bytesWritten,
+ "duration_s": time.Since(startTime).Seconds(),
+ logSystem + ".status": outcome.status,
+ logSystem + ".target_width": targetWidth,
+ logSystem + ".content_type": contentType,
+ logSystem + ".original_filesize": outcome.originalFileSize,
+ }
+}
+
+func handleOutcome(w http.ResponseWriter, req *http.Request, startTime time.Time, params *resizeParams, outcome *resizeOutcome) {
+ logger := log.ContextLogger(req.Context())
+ fields := *logFields(startTime, params, outcome)
+
+ switch outcome.status {
+ case statusRequestFailure:
+ if outcome.bytesWritten <= 0 {
+ helper.Fail500WithFields(w, req, outcome.err, fields)
+ } else {
+ helper.LogErrorWithFields(req, outcome.err, fields)
+ }
+ default:
+ logger.WithFields(fields).WithFields(
+ log.Fields{
+ "method": req.Method,
+ "uri": mask.URL(req.RequestURI),
+ },
+ ).Printf(outcome.status)
+ }
+}