Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal')
-rw-r--r--workhorse/internal/git/pktline.go59
-rw-r--r--workhorse/internal/git/pktline_test.go39
-rw-r--r--workhorse/internal/headers/content_headers.go10
-rw-r--r--workhorse/internal/lsif_transformer/parser/code_hover.go28
-rw-r--r--workhorse/internal/lsif_transformer/parser/code_hover_test.go48
-rw-r--r--workhorse/internal/redis/keywatcher.go230
-rw-r--r--workhorse/internal/redis/keywatcher_test.go103
-rw-r--r--workhorse/internal/senddata/contentprocessor/contentprocessor_test.go14
-rw-r--r--workhorse/internal/upload/destination/objectstore/uploader.go4
-rw-r--r--workhorse/internal/upstream/metrics_test.go2
-rw-r--r--workhorse/internal/upstream/routes.go46
-rw-r--r--workhorse/internal/upstream/upstream.go9
-rw-r--r--workhorse/internal/upstream/upstream_test.go4
13 files changed, 341 insertions, 255 deletions
diff --git a/workhorse/internal/git/pktline.go b/workhorse/internal/git/pktline.go
deleted file mode 100644
index e970f60182d..00000000000
--- a/workhorse/internal/git/pktline.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package git
-
-import (
- "bufio"
- "bytes"
- "fmt"
- "io"
- "strconv"
-)
-
-func scanDeepen(body io.Reader) bool {
- scanner := bufio.NewScanner(body)
- scanner.Split(pktLineSplitter)
- for scanner.Scan() {
- if bytes.HasPrefix(scanner.Bytes(), []byte("deepen")) && scanner.Err() == nil {
- return true
- }
- }
-
- return false
-}
-
-func pktLineSplitter(data []byte, atEOF bool) (advance int, token []byte, err error) {
- if len(data) < 4 {
- if atEOF && len(data) > 0 {
- return 0, nil, fmt.Errorf("pktLineSplitter: incomplete length prefix on %q", data)
- }
- return 0, nil, nil // want more data
- }
-
- if bytes.HasPrefix(data, []byte("0000")) {
- // special case: "0000" terminator packet: return empty token
- return 4, data[:0], nil
- }
-
- // We have at least 4 bytes available so we can decode the 4-hex digit
- // length prefix of the packet line.
- pktLength64, err := strconv.ParseInt(string(data[:4]), 16, 0)
- if err != nil {
- return 0, nil, fmt.Errorf("pktLineSplitter: decode length: %v", err)
- }
-
- // Cast is safe because we requested an int-size number from strconv.ParseInt
- pktLength := int(pktLength64)
-
- if pktLength < 0 {
- return 0, nil, fmt.Errorf("pktLineSplitter: invalid length: %d", pktLength)
- }
-
- if len(data) < pktLength {
- if atEOF {
- return 0, nil, fmt.Errorf("pktLineSplitter: less than %d bytes in input %q", pktLength, data)
- }
- return 0, nil, nil // want more data
- }
-
- // return "pkt" token without length prefix
- return pktLength, data[4:pktLength], nil
-}
diff --git a/workhorse/internal/git/pktline_test.go b/workhorse/internal/git/pktline_test.go
deleted file mode 100644
index d4be8634538..00000000000
--- a/workhorse/internal/git/pktline_test.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package git
-
-import (
- "bytes"
- "testing"
-)
-
-func TestSuccessfulScanDeepen(t *testing.T) {
- examples := []struct {
- input string
- output bool
- }{
- {"000dsomething000cdeepen 10000", true},
- {"000dsomething0000000cdeepen 1", true},
- {"000dsomething0000", false},
- }
-
- for _, example := range examples {
- hasDeepen := scanDeepen(bytes.NewReader([]byte(example.input)))
-
- if hasDeepen != example.output {
- t.Fatalf("scanDeepen %q: expected %v, got %v", example.input, example.output, hasDeepen)
- }
- }
-}
-
-func TestFailedScanDeepen(t *testing.T) {
- examples := []string{
- "invalid data",
- "deepen",
- "000cdeepen",
- }
-
- for _, example := range examples {
- if scanDeepen(bytes.NewReader([]byte(example))) {
- t.Fatalf("scanDeepen %q: expected result to be false, got true", example)
- }
- }
-}
diff --git a/workhorse/internal/headers/content_headers.go b/workhorse/internal/headers/content_headers.go
index 8cca3d97e82..854cc8abddd 100644
--- a/workhorse/internal/headers/content_headers.go
+++ b/workhorse/internal/headers/content_headers.go
@@ -43,6 +43,16 @@ const (
func SafeContentHeaders(data []byte, contentDisposition string) (string, string) {
contentType := safeContentType(data)
contentDisposition = safeContentDisposition(contentType, contentDisposition)
+
+ // Set attachments to application/octet-stream since browsers can do
+ // a better job distinguishing certain types (for example: ZIP files
+ // vs. Microsoft .docx files). However, browsers may safely render SVGs even
+ // when Content-Disposition is an attachment but only if the SVG
+ // Content-Type is set. Note that scripts in an SVG file will only be executed
+ // if the file is downloaded separately with an inline Content-Disposition.
+ if attachmentRegex.MatchString(contentDisposition) && !isType(contentType, svgMimeTypeRegex) {
+ contentType = "application/octet-stream"
+ }
return contentType, contentDisposition
}
diff --git a/workhorse/internal/lsif_transformer/parser/code_hover.go b/workhorse/internal/lsif_transformer/parser/code_hover.go
index 25550cce29e..ab3ab291432 100644
--- a/workhorse/internal/lsif_transformer/parser/code_hover.go
+++ b/workhorse/internal/lsif_transformer/parser/code_hover.go
@@ -28,6 +28,16 @@ type truncatableString struct {
Truncated bool
}
+// supportedLexerLanguages is used for a fast lookup to ensure the language
+// is supported by the lexer library.
+var supportedLexerLanguages = map[string]struct{}{}
+
+func init() {
+ for _, name := range lexers.Names(true) {
+ supportedLexerLanguages[name] = struct{}{}
+ }
+}
+
func (ts *truncatableString) UnmarshalText(b []byte) error {
s := 0
for i := 0; s < len(b); i++ {
@@ -93,6 +103,24 @@ func newCodeHover(content json.RawMessage) (*codeHover, error) {
}
func (c *codeHover) setTokens() {
+ // fastpath: bail early if no language specified
+ if c.Language == "" {
+ return
+ }
+
+ // fastpath: lexer.Get() will first match against indexed languages by
+ // name and alias, and then fallback to a very slow filepath match. We
+ // avoid this slow path by first checking against languages we know to
+ // be within the index, and bailing if not found.
+ //
+ // Not case-folding immediately is done intentionally. These two lookups
+ // mirror the behaviour of lexer.Get().
+ if _, ok := supportedLexerLanguages[c.Language]; !ok {
+ if _, ok := supportedLexerLanguages[strings.ToLower(c.Language)]; !ok {
+ return
+ }
+ }
+
lexer := lexers.Get(c.Language)
if lexer == nil {
return
diff --git a/workhorse/internal/lsif_transformer/parser/code_hover_test.go b/workhorse/internal/lsif_transformer/parser/code_hover_test.go
index c09636b2f76..7dc9e126ae7 100644
--- a/workhorse/internal/lsif_transformer/parser/code_hover_test.go
+++ b/workhorse/internal/lsif_transformer/parser/code_hover_test.go
@@ -56,6 +56,14 @@ func TestHighlight(t *testing.T) {
},
},
{
+ name: "ruby by file extension",
+ language: "rb",
+ value: `print hello`,
+ want: [][]token{
+ {{Value: "print hello"}},
+ },
+ },
+ {
name: "unknown/malicious language is passed",
language: "<lang> alert(1); </lang>",
value: `def a;\nend`,
@@ -116,3 +124,43 @@ func TestTruncatingMultiByteChars(t *testing.T) {
symbolSize := 3
require.Equal(t, value[0:maxValueSize*symbolSize], c.TruncatedValue.Value)
}
+
+func BenchmarkHighlight(b *testing.B) {
+ type entry struct {
+ Language string `json:"language"`
+ Value string `json:"value"`
+ }
+
+ tests := []entry{
+ {
+ Language: "go",
+ Value: "func main()",
+ },
+ {
+ Language: "ruby",
+ Value: "def read(line)",
+ },
+ {
+ Language: "",
+ Value: "<html><head>foobar</head></html>",
+ },
+ {
+ Language: "zzz",
+ Value: "def read(line)",
+ },
+ }
+
+ for _, tc := range tests {
+ b.Run("lang:"+tc.Language, func(b *testing.B) {
+ raw, err := json.Marshal(tc)
+ require.NoError(b, err)
+
+ b.ResetTimer()
+
+ for n := 0; n < b.N; n++ {
+ _, err := newCodeHovers(raw)
+ require.NoError(b, err)
+ }
+ })
+ }
+}
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 82cb082f5f0..cdf6ccd7e83 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -15,61 +15,99 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
)
-var (
- keyWatcher = make(map[string][]chan string)
- keyWatcherMutex sync.Mutex
- shutdown = make(chan struct{})
- redisReconnectTimeout = backoff.Backoff{
- //These are the defaults
- Min: 100 * time.Millisecond,
- Max: 60 * time.Second,
- Factor: 2,
- Jitter: true,
+type KeyWatcher struct {
+ mu sync.Mutex
+ subscribers map[string][]chan string
+ shutdown chan struct{}
+ reconnectBackoff backoff.Backoff
+ conn *redis.PubSubConn
+}
+
+func NewKeyWatcher() *KeyWatcher {
+ return &KeyWatcher{
+ shutdown: make(chan struct{}),
+ reconnectBackoff: backoff.Backoff{
+ Min: 100 * time.Millisecond,
+ Max: 60 * time.Second,
+ Factor: 2,
+ Jitter: true,
+ },
}
+}
+
+var (
keyWatchers = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_keywatcher_keywatchers",
Help: "The number of keys that is being watched by gitlab-workhorse",
},
)
+ redisSubscriptions = promauto.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "gitlab_workhorse_keywatcher_redis_subscriptions",
+ Help: "Current number of keywatcher Redis pubsub subscriptions",
+ },
+ )
totalMessages = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_keywatcher_total_messages",
Help: "How many messages gitlab-workhorse has received in total on pubsub.",
},
)
+ totalActions = promauto.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_keywatcher_actions_total",
+ Help: "Counts of various keywatcher actions",
+ },
+ []string{"action"},
+ )
+ receivedBytes = promauto.NewCounter(
+ prometheus.CounterOpts{
+ Name: "gitlab_workhorse_keywatcher_received_bytes_total",
+ Help: "How many bytes of messages gitlab-workhorse has received in total on pubsub.",
+ },
+ )
)
-const (
- keySubChannel = "workhorse:notifications"
-)
+const channelPrefix = "workhorse:notifications:"
-// KeyChan holds a key and a channel
-type KeyChan struct {
- Key string
- Chan chan string
-}
+func countAction(action string) { totalActions.WithLabelValues(action).Add(1) }
-func processInner(conn redis.Conn) error {
- defer conn.Close()
- psc := redis.PubSubConn{Conn: conn}
- if err := psc.Subscribe(keySubChannel); err != nil {
- return err
- }
- defer psc.Unsubscribe(keySubChannel)
+func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
+ kw.mu.Lock()
+ // We must share kw.conn with the goroutines that call SUBSCRIBE and
+ // UNSUBSCRIBE because Redis pubsub subscriptions are tied to the
+ // connection.
+ kw.conn = &redis.PubSubConn{Conn: conn}
+ kw.mu.Unlock()
+
+ defer func() {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ kw.conn.Close()
+ kw.conn = nil
+
+ // Reset kw.subscribers because it is tied to Redis server side state of
+ // kw.conn and we just closed that connection.
+ for _, chans := range kw.subscribers {
+ for _, ch := range chans {
+ close(ch)
+ keyWatchers.Dec()
+ }
+ }
+ kw.subscribers = nil
+ }()
for {
- switch v := psc.Receive().(type) {
+ switch v := kw.conn.Receive().(type) {
case redis.Message:
totalMessages.Inc()
- dataStr := string(v.Data)
- msg := strings.SplitN(dataStr, "=", 2)
- if len(msg) != 2 {
- log.WithError(fmt.Errorf("keywatcher: invalid notification: %q", dataStr)).Error()
- continue
+ receivedBytes.Add(float64(len(v.Data)))
+ if strings.HasPrefix(v.Channel, channelPrefix) {
+ kw.notifySubscribers(v.Channel[len(channelPrefix):], string(v.Data))
}
- key, value := msg[0], msg[1]
- notifyChanWatchers(key, value)
+ case redis.Subscription:
+ redisSubscriptions.Set(float64(v.Count))
case error:
log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", v)).Error()
// Intermittent error, return nil so that it doesn't wait before reconnect
@@ -94,72 +132,106 @@ func dialPubSub(dialer redisDialerFunc) (redis.Conn, error) {
return conn, nil
}
-// Process redis subscriptions
-//
-// NOTE: There Can Only Be One!
-func Process() {
+func (kw *KeyWatcher) Process() {
log.Info("keywatcher: starting process loop")
for {
conn, err := dialPubSub(workerDialFunc)
if err != nil {
log.WithError(fmt.Errorf("keywatcher: %v", err)).Error()
- time.Sleep(redisReconnectTimeout.Duration())
+ time.Sleep(kw.reconnectBackoff.Duration())
continue
}
- redisReconnectTimeout.Reset()
+ kw.reconnectBackoff.Reset()
- if err = processInner(conn); err != nil {
- log.WithError(fmt.Errorf("keywatcher: process loop: %v", err)).Error()
+ if err = kw.receivePubSubStream(conn); err != nil {
+ log.WithError(fmt.Errorf("keywatcher: receivePubSubStream: %v", err)).Error()
}
}
}
-func Shutdown() {
+func (kw *KeyWatcher) Shutdown() {
log.Info("keywatcher: shutting down")
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
select {
- case <-shutdown:
+ case <-kw.shutdown:
// already closed
default:
- close(shutdown)
+ close(kw.shutdown)
}
}
-func notifyChanWatchers(key, value string) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- if chanList, ok := keyWatcher[key]; ok {
- for _, c := range chanList {
- c <- value
- keyWatchers.Dec()
+func (kw *KeyWatcher) notifySubscribers(key, value string) {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ chanList, ok := kw.subscribers[key]
+ if !ok {
+ countAction("drop-message")
+ return
+ }
+
+ countAction("deliver-message")
+ for _, c := range chanList {
+ select {
+ case c <- value:
+ default:
}
- delete(keyWatcher, key)
}
}
-func addKeyChan(kc *KeyChan) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- keyWatcher[kc.Key] = append(keyWatcher[kc.Key], kc.Chan)
+func (kw *KeyWatcher) addSubscription(key string, notify chan string) error {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ if kw.conn == nil {
+ // This can happen because CI long polling is disabled in this Workhorse
+ // process. It can also be that we are waiting for the pubsub connection
+ // to be established. Either way it is OK to fail fast.
+ return errors.New("no redis connection")
+ }
+
+ if len(kw.subscribers[key]) == 0 {
+ countAction("create-subscription")
+ if err := kw.conn.Subscribe(channelPrefix + key); err != nil {
+ return err
+ }
+ }
+
+ if kw.subscribers == nil {
+ kw.subscribers = make(map[string][]chan string)
+ }
+ kw.subscribers[key] = append(kw.subscribers[key], notify)
keyWatchers.Inc()
+
+ return nil
}
-func delKeyChan(kc *KeyChan) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- if chans, ok := keyWatcher[kc.Key]; ok {
- for i, c := range chans {
- if kc.Chan == c {
- keyWatcher[kc.Key] = append(chans[:i], chans[i+1:]...)
- keyWatchers.Dec()
- break
- }
+func (kw *KeyWatcher) delSubscription(key string, notify chan string) {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ chans, ok := kw.subscribers[key]
+ if !ok {
+ // This can happen if the pubsub connection dropped while we were
+ // waiting.
+ return
+ }
+
+ for i, c := range chans {
+ if notify == c {
+ kw.subscribers[key] = append(chans[:i], chans[i+1:]...)
+ keyWatchers.Dec()
+ break
}
- if len(keyWatcher[kc.Key]) == 0 {
- delete(keyWatcher, kc.Key)
+ }
+ if len(kw.subscribers[key]) == 0 {
+ delete(kw.subscribers, key)
+ countAction("delete-subscription")
+ if kw.conn != nil {
+ kw.conn.Unsubscribe(channelPrefix + key)
}
}
}
@@ -179,15 +251,12 @@ const (
WatchKeyStatusNoChange
)
-// WatchKey waits for a key to be updated or expired
-func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) {
- kw := &KeyChan{
- Key: key,
- Chan: make(chan string, 1),
+func (kw *KeyWatcher) WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error) {
+ notify := make(chan string, 1)
+ if err := kw.addSubscription(key, notify); err != nil {
+ return WatchKeyStatusNoChange, err
}
-
- addKeyChan(kw)
- defer delKeyChan(kw)
+ defer kw.delSubscription(key, notify)
currentValue, err := GetString(key)
if errors.Is(err, redis.ErrNil) {
@@ -200,10 +269,10 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
}
select {
- case <-shutdown:
+ case <-kw.shutdown:
log.WithFields(log.Fields{"key": key}).Info("stopping watch due to shutdown")
return WatchKeyStatusNoChange, nil
- case currentValue := <-kw.Chan:
+ case currentValue := <-notify:
if currentValue == "" {
return WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET failed")
}
@@ -211,7 +280,6 @@ func WatchKey(key, value string, timeout time.Duration) (WatchKeyStatus, error)
return WatchKeyStatusNoChange, nil
}
return WatchKeyStatusSeenChange, nil
-
case <-time.After(timeout):
return WatchKeyStatusTimeout, nil
}
diff --git a/workhorse/internal/redis/keywatcher_test.go b/workhorse/internal/redis/keywatcher_test.go
index 29041226b14..bae49d81bb1 100644
--- a/workhorse/internal/redis/keywatcher_test.go
+++ b/workhorse/internal/redis/keywatcher_test.go
@@ -1,7 +1,6 @@
package redis
import (
- "errors"
"sync"
"testing"
"time"
@@ -38,33 +37,38 @@ func createUnsubscribeMessage(key string) []interface{} {
}
}
-func countWatchers(key string) int {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- return len(keyWatcher[key])
-}
-
-func deleteWatchers(key string) {
- keyWatcherMutex.Lock()
- defer keyWatcherMutex.Unlock()
- delete(keyWatcher, key)
+func (kw *KeyWatcher) countSubscribers(key string) int {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ return len(kw.subscribers[key])
}
// Forces a run of the `Process` loop against a mock PubSubConn.
-func processMessages(numWatchers int, value string) {
+func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}) {
psc := redigomock.NewConn()
+ psc.ReceiveWait = true
- // Setup the initial subscription message
- psc.Command("SUBSCRIBE", keySubChannel).Expect(createSubscribeMessage(keySubChannel))
- psc.Command("UNSUBSCRIBE", keySubChannel).Expect(createUnsubscribeMessage(keySubChannel))
- psc.AddSubscriptionMessage(createSubscriptionMessage(keySubChannel, runnerKey+"="+value))
+ channel := channelPrefix + runnerKey
+ psc.Command("SUBSCRIBE", channel).Expect(createSubscribeMessage(channel))
+ psc.Command("UNSUBSCRIBE", channel).Expect(createUnsubscribeMessage(channel))
+ psc.AddSubscriptionMessage(createSubscriptionMessage(channel, value))
- // Wait for all the `WatchKey` calls to be registered
- for countWatchers(runnerKey) != numWatchers {
- time.Sleep(time.Millisecond)
- }
+ errC := make(chan error)
+ go func() { errC <- kw.receivePubSubStream(psc) }()
+
+ require.Eventually(t, func() bool {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ return kw.conn != nil
+ }, time.Second, time.Millisecond)
+ close(ready)
+
+ require.Eventually(t, func() bool {
+ return kw.countSubscribers(runnerKey) == numWatchers
+ }, time.Second, time.Millisecond)
+ close(psc.ReceiveNow)
- processInner(psc)
+ require.NoError(t, <-errC)
}
type keyChangeTestCase struct {
@@ -77,18 +81,6 @@ type keyChangeTestCase struct {
timeout time.Duration
}
-func TestKeyChangesBubblesUpError(t *testing.T) {
- conn, td := setupMockPool()
- defer td()
-
- conn.Command("GET", runnerKey).ExpectError(errors.New("test error"))
-
- _, err := WatchKey(runnerKey, "something", time.Second)
- require.Error(t, err, "Expected error")
-
- deleteWatchers(runnerKey)
-}
-
func TestKeyChangesInstantReturn(t *testing.T) {
testCases := []keyChangeTestCase{
// WatchKeyStatusAlreadyChanged
@@ -135,12 +127,14 @@ func TestKeyChangesInstantReturn(t *testing.T) {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
- val, err := WatchKey(runnerKey, tc.watchValue, tc.timeout)
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
+ kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
+
+ val, err := kw.WatchKey(runnerKey, tc.watchValue, tc.timeout)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
-
- deleteWatchers(runnerKey)
})
}
}
@@ -183,18 +177,23 @@ func TestKeyChangesWhenWatching(t *testing.T) {
conn.Command("GET", runnerKey).Expect(tc.returnValue)
}
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
+
wg := &sync.WaitGroup{}
wg.Add(1)
+ ready := make(chan struct{})
go func() {
defer wg.Done()
- val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
+ <-ready
+ val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
- processMessages(1, tc.processedValue)
+ kw.processMessages(t, 1, tc.processedValue, ready)
wg.Wait()
})
}
@@ -237,18 +236,23 @@ func TestKeyChangesParallel(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(runTimes)
+ ready := make(chan struct{})
+
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
for i := 0; i < runTimes; i++ {
go func() {
defer wg.Done()
- val, err := WatchKey(runnerKey, tc.watchValue, time.Second)
+ <-ready
+ val, err := kw.WatchKey(runnerKey, tc.watchValue, time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, tc.expectedStatus, val, "Expected value")
}()
}
- processMessages(runTimes, tc.processedValue)
+ kw.processMessages(t, runTimes, tc.processedValue, ready)
wg.Wait()
})
}
@@ -257,7 +261,10 @@ func TestKeyChangesParallel(t *testing.T) {
func TestShutdown(t *testing.T) {
conn, td := setupMockPool()
defer td()
- defer func() { shutdown = make(chan struct{}) }()
+
+ kw := NewKeyWatcher()
+ kw.conn = &redis.PubSubConn{Conn: redigomock.NewConn()}
+ defer kw.Shutdown()
conn.Command("GET", runnerKey).Expect("something")
@@ -265,30 +272,30 @@ func TestShutdown(t *testing.T) {
wg.Add(2)
go func() {
- val, err := WatchKey(runnerKey, "something", 10*time.Second)
+ defer wg.Done()
+ val, err := kw.WatchKey(runnerKey, "something", 10*time.Second)
require.NoError(t, err, "Expected no error")
require.Equal(t, WatchKeyStatusNoChange, val, "Expected value not to change")
- wg.Done()
}()
go func() {
- require.Eventually(t, func() bool { return countWatchers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
+ defer wg.Done()
+ require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
- Shutdown()
- wg.Done()
+ kw.Shutdown()
}()
wg.Wait()
- require.Eventually(t, func() bool { return countWatchers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
+ require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
// Adding a key after the shutdown should result in an immediate response
var val WatchKeyStatus
var err error
done := make(chan struct{})
go func() {
- val, err = WatchKey(runnerKey, "something", 10*time.Second)
+ val, err = kw.WatchKey(runnerKey, "something", 10*time.Second)
close(done)
}()
diff --git a/workhorse/internal/senddata/contentprocessor/contentprocessor_test.go b/workhorse/internal/senddata/contentprocessor/contentprocessor_test.go
index ce7f7921589..b04263de6b9 100644
--- a/workhorse/internal/senddata/contentprocessor/contentprocessor_test.go
+++ b/workhorse/internal/senddata/contentprocessor/contentprocessor_test.go
@@ -86,7 +86,7 @@ func TestSetProperContentTypeAndDisposition(t *testing.T) {
},
{
desc: "Application type",
- contentType: "application/pdf",
+ contentType: "application/octet-stream",
contentDisposition: "attachment",
body: testhelper.LoadFile(t, "testdata/file.pdf"),
},
@@ -110,7 +110,7 @@ func TestSetProperContentTypeAndDisposition(t *testing.T) {
},
{
desc: "Audio type",
- contentType: "audio/mpeg",
+ contentType: "application/octet-stream",
contentDisposition: "attachment",
body: testhelper.LoadFile(t, "testdata/audio.mp3"),
},
@@ -152,16 +152,22 @@ func TestSetProperContentTypeAndDisposition(t *testing.T) {
},
{
desc: "Sketch file",
- contentType: "application/zip",
+ contentType: "application/octet-stream",
contentDisposition: "attachment",
body: testhelper.LoadFile(t, "testdata/file.sketch"),
},
{
desc: "PDF file with non-ASCII characters in filename",
- contentType: "application/pdf",
+ contentType: "application/octet-stream",
contentDisposition: `attachment; filename="file-ä.pdf"; filename*=UTF-8''file-%c3.pdf`,
body: testhelper.LoadFile(t, "testdata/file-ä.pdf"),
},
+ {
+ desc: "Microsoft Word file",
+ contentType: "application/octet-stream",
+ contentDisposition: `attachment`,
+ body: testhelper.LoadFile(t, "testdata/file.docx"),
+ },
}
for _, tc := range testCases {
diff --git a/workhorse/internal/upload/destination/objectstore/uploader.go b/workhorse/internal/upload/destination/objectstore/uploader.go
index aedfbe55ead..43e573872ee 100644
--- a/workhorse/internal/upload/destination/objectstore/uploader.go
+++ b/workhorse/internal/upload/destination/objectstore/uploader.go
@@ -80,13 +80,13 @@ func (u *uploader) Consume(outerCtx context.Context, reader io.Reader, deadline
cr := &countReader{r: reader}
if err := u.strategy.Upload(uploadCtx, cr); err != nil {
- return cr.n, err
+ return 0, err
}
if u.checkETag {
if err := compareMD5(hexString(hasher), u.strategy.ETag()); err != nil {
log.ContextLogger(uploadCtx).WithError(err).Error("error comparing MD5 checksum")
- return cr.n, err
+ return 0, err
}
}
diff --git a/workhorse/internal/upstream/metrics_test.go b/workhorse/internal/upstream/metrics_test.go
index 29a9e09777c..dff849ac214 100644
--- a/workhorse/internal/upstream/metrics_test.go
+++ b/workhorse/internal/upstream/metrics_test.go
@@ -26,7 +26,7 @@ func TestInstrumentGeoProxyRoute(t *testing.T) {
handleRouteWithMatchers(u, local),
handleRouteWithMatchers(u, main),
}
- })
+ }, nil)
ts := httptest.NewServer(u)
defer ts.Close()
diff --git a/workhorse/internal/upstream/routes.go b/workhorse/internal/upstream/routes.go
index c889f87ed96..40cd012a890 100644
--- a/workhorse/internal/upstream/routes.go
+++ b/workhorse/internal/upstream/routes.go
@@ -21,7 +21,6 @@ import (
"gitlab.com/gitlab-org/gitlab/workhorse/internal/imageresizer"
proxypkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/queueing"
- "gitlab.com/gitlab-org/gitlab/workhorse/internal/redis"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/secret"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/senddata"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/sendfile"
@@ -51,7 +50,7 @@ const (
gitProjectPattern = `^/.+\.git/`
geoGitProjectPattern = `^/[^-].+\.git/` // Prevent matching routes like /-/push_from_secondary
projectPattern = `^/([^/]+/){1,}[^/]+/`
- apiProjectPattern = apiPattern + `v4/projects/[^/]+/` // API: Projects can be encoded via group%2Fsubgroup%2Fproject
+ apiProjectPattern = apiPattern + `v4/projects/[^/]+` // API: Projects can be encoded via group%2Fsubgroup%2Fproject
apiTopicPattern = apiPattern + `v4/topics`
snippetUploadPattern = `^/uploads/personal_snippet`
userUploadPattern = `^/uploads/user`
@@ -223,7 +222,7 @@ func configureRoutes(u *upstream) {
tempfileMultipartProxy := upload.FixedPreAuthMultipart(api, proxy, preparer)
ciAPIProxyQueue := queueing.QueueRequests("ci_api_job_requests", tempfileMultipartProxy, u.APILimit, u.APIQueueLimit, u.APIQueueTimeout)
- ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, redis.WatchKey, u.APICILongPollingDuration)
+ ciAPILongPolling := builds.RegisterHandler(ciAPIProxyQueue, u.watchKeyHandler, u.APICILongPollingDuration)
dependencyProxyInjector.SetUploadHandler(requestBodyUploader)
@@ -269,37 +268,40 @@ func configureRoutes(u *upstream) {
// https://gitlab.com/gitlab-org/gitlab/-/merge_requests/56731.
// Maven Artifact Repository
- u.route("PUT", apiProjectPattern+`packages/maven/`, requestBodyUploader),
+ u.route("PUT", apiProjectPattern+`/packages/maven/`, requestBodyUploader),
// Conan Artifact Repository
u.route("PUT", apiPattern+`v4/packages/conan/`, requestBodyUploader),
- u.route("PUT", apiProjectPattern+`packages/conan/`, requestBodyUploader),
+ u.route("PUT", apiProjectPattern+`/packages/conan/`, requestBodyUploader),
// Generic Packages Repository
- u.route("PUT", apiProjectPattern+`packages/generic/`, requestBodyUploader),
+ u.route("PUT", apiProjectPattern+`/packages/generic/`, requestBodyUploader),
// NuGet Artifact Repository
- u.route("PUT", apiProjectPattern+`packages/nuget/`, mimeMultipartUploader),
+ u.route("PUT", apiProjectPattern+`/packages/nuget/`, mimeMultipartUploader),
// PyPI Artifact Repository
- u.route("POST", apiProjectPattern+`packages/pypi`, mimeMultipartUploader),
+ u.route("POST", apiProjectPattern+`/packages/pypi`, mimeMultipartUploader),
// Debian Artifact Repository
- u.route("PUT", apiProjectPattern+`packages/debian/`, requestBodyUploader),
+ u.route("PUT", apiProjectPattern+`/packages/debian/`, requestBodyUploader),
+
+ // RPM Artifact Repository
+ u.route("POST", apiProjectPattern+`/packages/rpm/`, requestBodyUploader),
// Gem Artifact Repository
- u.route("POST", apiProjectPattern+`packages/rubygems/`, requestBodyUploader),
+ u.route("POST", apiProjectPattern+`/packages/rubygems/`, requestBodyUploader),
// Terraform Module Package Repository
- u.route("PUT", apiProjectPattern+`packages/terraform/modules/`, requestBodyUploader),
+ u.route("PUT", apiProjectPattern+`/packages/terraform/modules/`, requestBodyUploader),
// Helm Artifact Repository
- u.route("POST", apiProjectPattern+`packages/helm/api/[^/]+/charts\z`, mimeMultipartUploader),
+ u.route("POST", apiProjectPattern+`/packages/helm/api/[^/]+/charts\z`, mimeMultipartUploader),
// We are porting API to disk acceleration
// we need to declare each routes until we have fixed all the routes on the rails codebase.
// Overall status can be seen at https://gitlab.com/groups/gitlab-org/-/epics/1802#current-status
- u.route("POST", apiProjectPattern+`wikis/attachments\z`, tempfileMultipartProxy),
+ u.route("POST", apiProjectPattern+`/wikis/attachments\z`, tempfileMultipartProxy),
u.route("POST", apiPattern+`graphql\z`, tempfileMultipartProxy),
u.route("POST", apiTopicPattern, tempfileMultipartProxy),
u.route("PUT", apiTopicPattern, tempfileMultipartProxy),
@@ -312,16 +314,28 @@ func configureRoutes(u *upstream) {
u.route("POST", importPattern+`gitlab_group`, mimeMultipartUploader),
// Issuable Metric image upload
- u.route("POST", apiProjectPattern+`issues/[0-9]+/metric_images\z`, mimeMultipartUploader),
+ u.route("POST", apiProjectPattern+`/issues/[0-9]+/metric_images\z`, mimeMultipartUploader),
// Alert Metric image upload
- u.route("POST", apiProjectPattern+`alert_management_alerts/[0-9]+/metric_images\z`, mimeMultipartUploader),
+ u.route("POST", apiProjectPattern+`/alert_management_alerts/[0-9]+/metric_images\z`, mimeMultipartUploader),
// Requirements Import via UI upload acceleration
u.route("POST", projectPattern+`requirements_management/requirements/import_csv`, mimeMultipartUploader),
// Uploads via API
- u.route("POST", apiProjectPattern+`uploads\z`, mimeMultipartUploader),
+ u.route("POST", apiProjectPattern+`/uploads\z`, mimeMultipartUploader),
+
+ // Project Avatar
+ u.route("POST", apiPattern+`v4/projects\z`, tempfileMultipartProxy),
+ u.route("PUT", apiProjectPattern+`\z`, tempfileMultipartProxy),
+
+ // Group Avatar
+ u.route("POST", apiPattern+`v4/groups\z`, tempfileMultipartProxy),
+ u.route("PUT", apiPattern+`v4/groups/[^/]+\z`, tempfileMultipartProxy),
+
+ // User Avatar
+ u.route("POST", apiPattern+`v4/users\z`, tempfileMultipartProxy),
+ u.route("PUT", apiPattern+`v4/users/[0-9]+\z`, tempfileMultipartProxy),
// Explicitly proxy API requests
u.route("", apiPattern, proxy),
diff --git a/workhorse/internal/upstream/upstream.go b/workhorse/internal/upstream/upstream.go
index 43b470b568f..248f190e316 100644
--- a/workhorse/internal/upstream/upstream.go
+++ b/workhorse/internal/upstream/upstream.go
@@ -21,6 +21,7 @@ import (
"gitlab.com/gitlab-org/labkit/correlation"
apipkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/api"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/builds"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
"gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
proxypkg "gitlab.com/gitlab-org/gitlab/workhorse/internal/proxy"
@@ -55,19 +56,21 @@ type upstream struct {
accessLogger *logrus.Logger
enableGeoProxyFeature bool
mu sync.RWMutex
+ watchKeyHandler builds.WatchKeyHandler
}
-func NewUpstream(cfg config.Config, accessLogger *logrus.Logger) http.Handler {
- return newUpstream(cfg, accessLogger, configureRoutes)
+func NewUpstream(cfg config.Config, accessLogger *logrus.Logger, watchKeyHandler builds.WatchKeyHandler) http.Handler {
+ return newUpstream(cfg, accessLogger, configureRoutes, watchKeyHandler)
}
-func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback func(*upstream)) http.Handler {
+func newUpstream(cfg config.Config, accessLogger *logrus.Logger, routesCallback func(*upstream), watchKeyHandler builds.WatchKeyHandler) http.Handler {
up := upstream{
Config: cfg,
accessLogger: accessLogger,
// Kind of a feature flag. See https://gitlab.com/groups/gitlab-org/-/epics/5914#note_564974130
enableGeoProxyFeature: os.Getenv("GEO_SECONDARY_PROXY") != "0",
geoProxyBackend: &url.URL{},
+ watchKeyHandler: watchKeyHandler,
}
if up.geoProxyPollSleep == nil {
up.geoProxyPollSleep = time.Sleep
diff --git a/workhorse/internal/upstream/upstream_test.go b/workhorse/internal/upstream/upstream_test.go
index 21fa7b81fdb..7ab3e67116f 100644
--- a/workhorse/internal/upstream/upstream_test.go
+++ b/workhorse/internal/upstream/upstream_test.go
@@ -58,7 +58,7 @@ func TestRouting(t *testing.T) {
handle(u, quxbaz),
handle(u, main),
}
- })
+ }, nil)
ts := httptest.NewServer(u)
defer ts.Close()
@@ -415,7 +415,7 @@ func startWorkhorseServer(railsServerURL string, enableGeoProxyFeature bool) (*h
configureRoutes(u)
}
cfg := newUpstreamConfig(railsServerURL)
- upstreamHandler := newUpstream(*cfg, logrus.StandardLogger(), myConfigureRoutes)
+ upstreamHandler := newUpstream(*cfg, logrus.StandardLogger(), myConfigureRoutes, nil)
ws := httptest.NewServer(upstreamHandler)
waitForNextApiPoll := func() {}