Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'workhorse/internal')
-rw-r--r--workhorse/internal/config/config.go15
-rw-r--r--workhorse/internal/dependencyproxy/dependencyproxy.go73
-rw-r--r--workhorse/internal/dependencyproxy/dependencyproxy_test.go153
-rw-r--r--workhorse/internal/gitaly/gitaly.go35
-rw-r--r--workhorse/internal/gitaly/gitaly_test.go9
-rw-r--r--workhorse/internal/gitaly/namespace.go8
-rw-r--r--workhorse/internal/goredis/goredis.go186
-rw-r--r--workhorse/internal/goredis/goredis_test.go107
-rw-r--r--workhorse/internal/goredis/keywatcher.go236
-rw-r--r--workhorse/internal/goredis/keywatcher_test.go301
-rw-r--r--workhorse/internal/redis/keywatcher.go24
-rw-r--r--workhorse/internal/redis/redis.go12
12 files changed, 1081 insertions, 78 deletions
diff --git a/workhorse/internal/config/config.go b/workhorse/internal/config/config.go
index 687986974a3..3b928d42fe1 100644
--- a/workhorse/internal/config/config.go
+++ b/workhorse/internal/config/config.go
@@ -83,13 +83,14 @@ type GoogleCredentials struct {
}
type RedisConfig struct {
- URL TomlURL
- Sentinel []TomlURL
- SentinelMaster string
- Password string
- DB *int
- MaxIdle *int
- MaxActive *int
+ URL TomlURL
+ Sentinel []TomlURL
+ SentinelMaster string
+ SentinelPassword string
+ Password string
+ DB *int
+ MaxIdle *int
+ MaxActive *int
}
type ImageResizerConfig struct {
diff --git a/workhorse/internal/dependencyproxy/dependencyproxy.go b/workhorse/internal/dependencyproxy/dependencyproxy.go
index e170b001806..dbea3c29aec 100644
--- a/workhorse/internal/dependencyproxy/dependencyproxy.go
+++ b/workhorse/internal/dependencyproxy/dependencyproxy.go
@@ -23,8 +23,15 @@ type Injector struct {
}
type entryParams struct {
- Url string
- Header http.Header
+ Url string
+ Headers http.Header
+ UploadConfig uploadConfig
+}
+
+type uploadConfig struct {
+ Headers http.Header
+ Method string
+ Url string
}
type nullResponseWriter struct {
@@ -55,7 +62,13 @@ func (p *Injector) SetUploadHandler(uploadHandler http.Handler) {
}
func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData string) {
- dependencyResponse, err := p.fetchUrl(r.Context(), sendData)
+ params, err := p.unpackParams(sendData)
+ if err != nil {
+ fail.Request(w, r, err)
+ return
+ }
+
+ dependencyResponse, err := p.fetchUrl(r.Context(), params)
if err != nil {
fail.Request(w, r, err)
return
@@ -70,11 +83,10 @@ func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData strin
w.Header().Set("Content-Length", dependencyResponse.Header.Get("Content-Length"))
teeReader := io.TeeReader(dependencyResponse.Body, w)
- saveFileRequest, err := http.NewRequestWithContext(r.Context(), "POST", r.URL.String()+"/upload", teeReader)
+ saveFileRequest, err := p.newUploadRequest(r.Context(), params, r, teeReader)
if err != nil {
fail.Request(w, r, fmt.Errorf("dependency proxy: failed to create request: %w", err))
}
- saveFileRequest.Header = r.Header.Clone()
// forward headers from dependencyResponse to rails and client
for key, values := range dependencyResponse.Header {
@@ -100,17 +112,56 @@ func (p *Injector) Inject(w http.ResponseWriter, r *http.Request, sendData strin
}
}
-func (p *Injector) fetchUrl(ctx context.Context, sendData string) (*http.Response, error) {
+func (p *Injector) fetchUrl(ctx context.Context, params *entryParams) (*http.Response, error) {
+ r, err := http.NewRequestWithContext(ctx, "GET", params.Url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("dependency proxy: failed to fetch dependency: %w", err)
+ }
+ r.Header = params.Headers
+
+ return httpClient.Do(r)
+}
+
+func (p *Injector) newUploadRequest(ctx context.Context, params *entryParams, originalRequest *http.Request, body io.Reader) (*http.Request, error) {
+ method := p.uploadMethodFrom(params)
+ uploadUrl := p.uploadUrlFrom(params, originalRequest)
+ request, err := http.NewRequestWithContext(ctx, method, uploadUrl, body)
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header = originalRequest.Header.Clone()
+
+ for key, values := range params.UploadConfig.Headers {
+ request.Header.Del(key)
+ for _, value := range values {
+ request.Header.Add(key, value)
+ }
+ }
+
+ return request, nil
+}
+
+func (p *Injector) unpackParams(sendData string) (*entryParams, error) {
var params entryParams
if err := p.Unpack(&params, sendData); err != nil {
return nil, fmt.Errorf("dependency proxy: unpack sendData: %v", err)
}
- r, err := http.NewRequestWithContext(ctx, "GET", params.Url, nil)
- if err != nil {
- return nil, fmt.Errorf("dependency proxy: failed to fetch dependency: %v", err)
+ return &params, nil
+}
+
+func (p *Injector) uploadMethodFrom(params *entryParams) string {
+ if params.UploadConfig.Method != "" {
+ return params.UploadConfig.Method
}
- r.Header = params.Header
+ return http.MethodPost
+}
- return httpClient.Do(r)
+func (p *Injector) uploadUrlFrom(params *entryParams, originalRequest *http.Request) string {
+ if params.UploadConfig.Url != "" {
+ return params.UploadConfig.Url
+ }
+
+ return originalRequest.URL.String() + "/upload"
}
diff --git a/workhorse/internal/dependencyproxy/dependencyproxy_test.go b/workhorse/internal/dependencyproxy/dependencyproxy_test.go
index d893ddc500f..bee74ce0a9e 100644
--- a/workhorse/internal/dependencyproxy/dependencyproxy_test.go
+++ b/workhorse/internal/dependencyproxy/dependencyproxy_test.go
@@ -2,6 +2,7 @@ package dependencyproxy
import (
"encoding/base64"
+ "encoding/json"
"fmt"
"io"
"net/http"
@@ -149,6 +150,158 @@ func TestSuccessfullRequest(t *testing.T) {
require.Equal(t, dockerContentDigest, response.Header().Get("Docker-Content-Digest"))
}
+func TestValidUploadConfiguration(t *testing.T) {
+ content := []byte("content")
+ contentLength := strconv.Itoa(len(content))
+ contentType := "text/plain"
+ testHeader := "test-received-url"
+ originResourceServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set(testHeader, r.URL.Path)
+ w.Header().Set("Content-Length", contentLength)
+ w.Header().Set("Content-Type", contentType)
+ w.Write(content)
+ }))
+
+ testCases := []struct {
+ desc string
+ uploadConfig *uploadConfig
+ expectedConfig uploadConfig
+ }{
+ {
+ desc: "with the default values",
+ expectedConfig: uploadConfig{
+ Method: http.MethodPost,
+ Url: "/target/upload",
+ },
+ }, {
+ desc: "with overriden method",
+ uploadConfig: &uploadConfig{
+ Method: http.MethodPut,
+ },
+ expectedConfig: uploadConfig{
+ Method: http.MethodPut,
+ Url: "/target/upload",
+ },
+ }, {
+ desc: "with overriden url",
+ uploadConfig: &uploadConfig{
+ Url: "http://test.org/overriden/upload",
+ },
+ expectedConfig: uploadConfig{
+ Method: http.MethodPost,
+ Url: "http://test.org/overriden/upload",
+ },
+ }, {
+ desc: "with overriden headers",
+ uploadConfig: &uploadConfig{
+ Headers: map[string][]string{"Private-Token": {"123456789"}},
+ },
+ expectedConfig: uploadConfig{
+ Headers: map[string][]string{"Private-Token": {"123456789"}},
+ Method: http.MethodPost,
+ Url: "/target/upload",
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ uploadHandler := &fakeUploadHandler{
+ handler: func(w http.ResponseWriter, r *http.Request) {
+ require.Equal(t, tc.expectedConfig.Url, r.URL.String())
+ require.Equal(t, tc.expectedConfig.Method, r.Method)
+
+ if tc.expectedConfig.Headers != nil {
+ for k, v := range tc.expectedConfig.Headers {
+ require.Equal(t, v, r.Header[k])
+ }
+ }
+
+ w.WriteHeader(200)
+ },
+ }
+
+ injector := NewInjector()
+ injector.SetUploadHandler(uploadHandler)
+
+ sendData := map[string]interface{}{
+ "Token": "token",
+ "Url": originResourceServer.URL + `/remote/file`,
+ }
+
+ if tc.uploadConfig != nil {
+ sendData["UploadConfig"] = tc.uploadConfig
+ }
+
+ sendDataJsonString, err := json.Marshal(sendData)
+ require.NoError(t, err)
+
+ response := makeRequest(injector, string(sendDataJsonString))
+
+ //checking the response
+ require.Equal(t, 200, response.Code)
+ require.Equal(t, string(content), response.Body.String())
+ // checking remote file request
+ require.Equal(t, "/remote/file", response.Header().Get(testHeader))
+ })
+ }
+}
+
+func TestInvalidUploadConfiguration(t *testing.T) {
+ baseSendData := map[string]interface{}{
+ "Token": "token",
+ "Url": "http://remote.dev/remote/file",
+ }
+ testCases := []struct {
+ desc string
+ sendData map[string]interface{}
+ }{
+ {
+ desc: "with an invalid overriden method",
+ sendData: mergeMap(baseSendData, map[string]interface{}{
+ "UploadConfig": map[string]string{
+ "Method": "TEAPOT",
+ },
+ }),
+ }, {
+ desc: "with an invalid url",
+ sendData: mergeMap(baseSendData, map[string]interface{}{
+ "UploadConfig": map[string]string{
+ "Url": "invalid_url",
+ },
+ }),
+ }, {
+ desc: "with an invalid headers",
+ sendData: mergeMap(baseSendData, map[string]interface{}{
+ "UploadConfig": map[string]interface{}{
+ "Headers": map[string]string{
+ "Private-Token": "not_an_array",
+ },
+ },
+ }),
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ sendDataJsonString, err := json.Marshal(tc.sendData)
+ require.NoError(t, err)
+
+ response := makeRequest(NewInjector(), string(sendDataJsonString))
+
+ require.Equal(t, 500, response.Code)
+ require.Equal(t, "Internal Server Error\n", response.Body.String())
+ })
+ }
+}
+
+func mergeMap(from map[string]interface{}, into map[string]interface{}) map[string]interface{} {
+ for k, v := range from {
+ into[k] = v
+ }
+ return into
+}
+
func TestIncorrectSendData(t *testing.T) {
response := makeRequest(NewInjector(), "")
diff --git a/workhorse/internal/gitaly/gitaly.go b/workhorse/internal/gitaly/gitaly.go
index d9dbbdbb605..e4fbad17017 100644
--- a/workhorse/internal/gitaly/gitaly.go
+++ b/workhorse/internal/gitaly/gitaly.go
@@ -7,7 +7,6 @@ import (
"github.com/golang/protobuf/jsonpb" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
"github.com/golang/protobuf/proto" //lint:ignore SA1019 https://gitlab.com/gitlab-org/gitlab/-/issues/324868
- grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -114,16 +113,6 @@ func NewRepositoryClient(ctx context.Context, server api.GitalyServer) (context.
return withOutgoingMetadata(ctx, server), &RepositoryClient{grpcClient}, nil
}
-// NewNamespaceClient is only used by the Gitaly integration tests at present
-func NewNamespaceClient(ctx context.Context, server api.GitalyServer) (context.Context, *NamespaceClient, error) {
- conn, err := getOrCreateConnection(server)
- if err != nil {
- return nil, nil, err
- }
- grpcClient := gitalypb.NewNamespaceServiceClient(conn)
- return withOutgoingMetadata(ctx, server), &NamespaceClient{grpcClient}, nil
-}
-
func NewDiffClient(ctx context.Context, server api.GitalyServer) (context.Context, *DiffClient, error) {
conn, err := getOrCreateConnection(server)
if err != nil {
@@ -173,23 +162,19 @@ func CloseConnections() {
func newConnection(server api.GitalyServer) (*grpc.ClientConn, error) {
connOpts := append(gitalyclient.DefaultDialOpts,
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(server.Token)),
- grpc.WithStreamInterceptor(
- grpc_middleware.ChainStreamClient(
- grpctracing.StreamClientTracingInterceptor(),
- grpc_prometheus.StreamClientInterceptor,
- grpccorrelation.StreamClientCorrelationInterceptor(
- grpccorrelation.WithClientName("gitlab-workhorse"),
- ),
+ grpc.WithChainStreamInterceptor(
+ grpctracing.StreamClientTracingInterceptor(),
+ grpc_prometheus.StreamClientInterceptor,
+ grpccorrelation.StreamClientCorrelationInterceptor(
+ grpccorrelation.WithClientName("gitlab-workhorse"),
),
),
- grpc.WithUnaryInterceptor(
- grpc_middleware.ChainUnaryClient(
- grpctracing.UnaryClientTracingInterceptor(),
- grpc_prometheus.UnaryClientInterceptor,
- grpccorrelation.UnaryClientCorrelationInterceptor(
- grpccorrelation.WithClientName("gitlab-workhorse"),
- ),
+ grpc.WithChainUnaryInterceptor(
+ grpctracing.UnaryClientTracingInterceptor(),
+ grpc_prometheus.UnaryClientInterceptor,
+ grpccorrelation.UnaryClientCorrelationInterceptor(
+ grpccorrelation.WithClientName("gitlab-workhorse"),
),
),
diff --git a/workhorse/internal/gitaly/gitaly_test.go b/workhorse/internal/gitaly/gitaly_test.go
index 0ea5da20da3..04d3a0a79aa 100644
--- a/workhorse/internal/gitaly/gitaly_test.go
+++ b/workhorse/internal/gitaly/gitaly_test.go
@@ -46,15 +46,6 @@ func TestNewRepositoryClient(t *testing.T) {
testOutgoingMetadata(t, ctx)
}
-func TestNewNamespaceClient(t *testing.T) {
- ctx, _, err := NewNamespaceClient(
- context.Background(),
- serverFixture(),
- )
- require.NoError(t, err)
- testOutgoingMetadata(t, ctx)
-}
-
func TestNewDiffClient(t *testing.T) {
ctx, _, err := NewDiffClient(
context.Background(),
diff --git a/workhorse/internal/gitaly/namespace.go b/workhorse/internal/gitaly/namespace.go
deleted file mode 100644
index a9bc2d07a7e..00000000000
--- a/workhorse/internal/gitaly/namespace.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package gitaly
-
-import "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
-
-// NamespaceClient encapsulates NamespaceService calls
-type NamespaceClient struct {
- gitalypb.NamespaceServiceClient
-}
diff --git a/workhorse/internal/goredis/goredis.go b/workhorse/internal/goredis/goredis.go
new file mode 100644
index 00000000000..13a9d4cc34f
--- /dev/null
+++ b/workhorse/internal/goredis/goredis.go
@@ -0,0 +1,186 @@
+package goredis
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+ "time"
+
+ redis "github.com/redis/go-redis/v9"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ _ "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+ internalredis "gitlab.com/gitlab-org/gitlab/workhorse/internal/redis"
+)
+
+var (
+ rdb *redis.Client
+ // found in https://github.com/redis/go-redis/blob/c7399b6a17d7d3e2a57654528af91349f2468529/sentinel.go#L626
+ errSentinelMasterAddr error = errors.New("redis: all sentinels specified in configuration are unreachable")
+)
+
+const (
+ // Max Idle Connections in the pool.
+ defaultMaxIdle = 1
+ // Max Active Connections in the pool.
+ defaultMaxActive = 1
+ // Timeout for Read operations on the pool. 1 second is technically overkill,
+ // it's just for sanity.
+ defaultReadTimeout = 1 * time.Second
+ // Timeout for Write operations on the pool. 1 second is technically overkill,
+ // it's just for sanity.
+ defaultWriteTimeout = 1 * time.Second
+ // Timeout before killing Idle connections in the pool. 3 minutes seemed good.
+ // If you _actually_ hit this timeout often, you should consider turning of
+ // redis-support since it's not necessary at that point...
+ defaultIdleTimeout = 3 * time.Minute
+)
+
+// createDialer references https://github.com/redis/go-redis/blob/b1103e3d436b6fe98813ecbbe1f99dc8d59b06c9/options.go#L214
+// it intercepts the error and tracks it via a Prometheus counter
+func createDialer(sentinels []string) func(ctx context.Context, network, addr string) (net.Conn, error) {
+ return func(ctx context.Context, network, addr string) (net.Conn, error) {
+ var isSentinel bool
+ for _, sentinelAddr := range sentinels {
+ if sentinelAddr == addr {
+ isSentinel = true
+ break
+ }
+ }
+
+ dialTimeout := 5 * time.Second // go-redis default
+ destination := "redis"
+ if isSentinel {
+ // This timeout is recommended for Sentinel-support according to the guidelines.
+ // https://redis.io/topics/sentinel-clients#redis-service-discovery-via-sentinel
+ // For every address it should try to connect to the Sentinel,
+ // using a short timeout (in the order of a few hundreds of milliseconds).
+ destination = "sentinel"
+ dialTimeout = 500 * time.Millisecond
+ }
+
+ netDialer := &net.Dialer{
+ Timeout: dialTimeout,
+ KeepAlive: 5 * time.Minute,
+ }
+
+ conn, err := netDialer.DialContext(ctx, network, addr)
+ if err != nil {
+ internalredis.ErrorCounter.WithLabelValues("dial", destination).Inc()
+ } else {
+ if !isSentinel {
+ internalredis.TotalConnections.Inc()
+ }
+ }
+
+ return conn, err
+ }
+}
+
+// implements the redis.Hook interface for instrumentation
+type sentinelInstrumentationHook struct{}
+
+func (s sentinelInstrumentationHook) DialHook(next redis.DialHook) redis.DialHook {
+ return func(ctx context.Context, network, addr string) (net.Conn, error) {
+ conn, err := next(ctx, network, addr)
+ if err != nil && err.Error() == errSentinelMasterAddr.Error() {
+ // check for non-dialer error
+ internalredis.ErrorCounter.WithLabelValues("master", "sentinel").Inc()
+ }
+ return conn, err
+ }
+}
+
+func (s sentinelInstrumentationHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
+ return func(ctx context.Context, cmd redis.Cmder) error {
+ return next(ctx, cmd)
+ }
+}
+
+func (s sentinelInstrumentationHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
+ return func(ctx context.Context, cmds []redis.Cmder) error {
+ return next(ctx, cmds)
+ }
+}
+
+func GetRedisClient() *redis.Client {
+ return rdb
+}
+
+// Configure redis-connection
+func Configure(cfg *config.RedisConfig) error {
+ if cfg == nil {
+ return nil
+ }
+
+ var err error
+
+ if len(cfg.Sentinel) > 0 {
+ rdb = configureSentinel(cfg)
+ } else {
+ rdb, err = configureRedis(cfg)
+ }
+
+ return err
+}
+
+func configureRedis(cfg *config.RedisConfig) (*redis.Client, error) {
+ if cfg.URL.Scheme == "tcp" {
+ cfg.URL.Scheme = "redis"
+ }
+
+ opt, err := redis.ParseURL(cfg.URL.String())
+ if err != nil {
+ return nil, err
+ }
+
+ opt.DB = getOrDefault(cfg.DB, 0)
+ opt.Password = cfg.Password
+
+ opt.PoolSize = getOrDefault(cfg.MaxActive, defaultMaxActive)
+ opt.MaxIdleConns = getOrDefault(cfg.MaxIdle, defaultMaxIdle)
+ opt.ConnMaxIdleTime = defaultIdleTimeout
+ opt.ReadTimeout = defaultReadTimeout
+ opt.WriteTimeout = defaultWriteTimeout
+
+ opt.Dialer = createDialer([]string{})
+
+ return redis.NewClient(opt), nil
+}
+
+func configureSentinel(cfg *config.RedisConfig) *redis.Client {
+ sentinels := make([]string, len(cfg.Sentinel))
+ for i := range cfg.Sentinel {
+ sentinelDetails := cfg.Sentinel[i]
+ sentinels[i] = fmt.Sprintf("%s:%s", sentinelDetails.Hostname(), sentinelDetails.Port())
+ }
+
+ client := redis.NewFailoverClient(&redis.FailoverOptions{
+ MasterName: cfg.SentinelMaster,
+ SentinelAddrs: sentinels,
+ Password: cfg.Password,
+ SentinelPassword: cfg.SentinelPassword,
+ DB: getOrDefault(cfg.DB, 0),
+
+ PoolSize: getOrDefault(cfg.MaxActive, defaultMaxActive),
+ MaxIdleConns: getOrDefault(cfg.MaxIdle, defaultMaxIdle),
+ ConnMaxIdleTime: defaultIdleTimeout,
+
+ ReadTimeout: defaultReadTimeout,
+ WriteTimeout: defaultWriteTimeout,
+
+ Dialer: createDialer(sentinels),
+ })
+
+ client.AddHook(sentinelInstrumentationHook{})
+
+ return client
+}
+
+func getOrDefault(ptr *int, val int) int {
+ if ptr != nil {
+ return *ptr
+ }
+ return val
+}
diff --git a/workhorse/internal/goredis/goredis_test.go b/workhorse/internal/goredis/goredis_test.go
new file mode 100644
index 00000000000..6b281229ea4
--- /dev/null
+++ b/workhorse/internal/goredis/goredis_test.go
@@ -0,0 +1,107 @@
+package goredis
+
+import (
+ "context"
+ "net"
+ "sync/atomic"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/helper"
+)
+
+func mockRedisServer(t *testing.T, connectReceived *atomic.Value) string {
+ ln, err := net.Listen("tcp", "127.0.0.1:0")
+
+ require.Nil(t, err)
+
+ go func() {
+ defer ln.Close()
+ conn, err := ln.Accept()
+ require.Nil(t, err)
+ connectReceived.Store(true)
+ conn.Write([]byte("OK\n"))
+ }()
+
+ return ln.Addr().String()
+}
+
+func TestConfigureNoConfig(t *testing.T) {
+ rdb = nil
+ Configure(nil)
+ require.Nil(t, rdb, "rdb client should be nil")
+}
+
+func TestConfigureValidConfigX(t *testing.T) {
+ testCases := []struct {
+ scheme string
+ }{
+ {
+ scheme: "redis",
+ },
+ {
+ scheme: "tcp",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.scheme, func(t *testing.T) {
+ connectReceived := atomic.Value{}
+ a := mockRedisServer(t, &connectReceived)
+
+ parsedURL := helper.URLMustParse(tc.scheme + "://" + a)
+ cfg := &config.RedisConfig{URL: config.TomlURL{URL: *parsedURL}}
+
+ Configure(cfg)
+
+ require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil")
+
+ // goredis initialise connections lazily
+ rdb.Ping(context.Background())
+ require.True(t, connectReceived.Load().(bool))
+
+ rdb = nil
+ })
+ }
+}
+
+func TestConnectToSentinel(t *testing.T) {
+ testCases := []struct {
+ scheme string
+ }{
+ {
+ scheme: "redis",
+ },
+ {
+ scheme: "tcp",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.scheme, func(t *testing.T) {
+ connectReceived := atomic.Value{}
+ a := mockRedisServer(t, &connectReceived)
+
+ addrs := []string{tc.scheme + "://" + a}
+ var sentinelUrls []config.TomlURL
+
+ for _, a := range addrs {
+ parsedURL := helper.URLMustParse(a)
+ sentinelUrls = append(sentinelUrls, config.TomlURL{URL: *parsedURL})
+ }
+
+ cfg := &config.RedisConfig{Sentinel: sentinelUrls}
+ Configure(cfg)
+
+ require.NotNil(t, GetRedisClient().Conn(), "Pool should not be nil")
+
+ // goredis initialise connections lazily
+ rdb.Ping(context.Background())
+ require.True(t, connectReceived.Load().(bool))
+
+ rdb = nil
+ })
+ }
+}
diff --git a/workhorse/internal/goredis/keywatcher.go b/workhorse/internal/goredis/keywatcher.go
new file mode 100644
index 00000000000..741bfb17652
--- /dev/null
+++ b/workhorse/internal/goredis/keywatcher.go
@@ -0,0 +1,236 @@
+package goredis
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/jpillora/backoff"
+ "github.com/redis/go-redis/v9"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/log"
+ internalredis "gitlab.com/gitlab-org/gitlab/workhorse/internal/redis"
+)
+
+type KeyWatcher struct {
+ mu sync.Mutex
+ subscribers map[string][]chan string
+ shutdown chan struct{}
+ reconnectBackoff backoff.Backoff
+ redisConn *redis.Client
+ conn *redis.PubSub
+}
+
+func NewKeyWatcher() *KeyWatcher {
+ return &KeyWatcher{
+ shutdown: make(chan struct{}),
+ reconnectBackoff: backoff.Backoff{
+ Min: 100 * time.Millisecond,
+ Max: 60 * time.Second,
+ Factor: 2,
+ Jitter: true,
+ },
+ }
+}
+
+const channelPrefix = "workhorse:notifications:"
+
+func countAction(action string) { internalredis.TotalActions.WithLabelValues(action).Add(1) }
+
+func (kw *KeyWatcher) receivePubSubStream(ctx context.Context, pubsub *redis.PubSub) error {
+ kw.mu.Lock()
+ // We must share kw.conn with the goroutines that call SUBSCRIBE and
+ // UNSUBSCRIBE because Redis pubsub subscriptions are tied to the
+ // connection.
+ kw.conn = pubsub
+ kw.mu.Unlock()
+
+ defer func() {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ kw.conn.Close()
+ kw.conn = nil
+
+ // Reset kw.subscribers because it is tied to Redis server side state of
+ // kw.conn and we just closed that connection.
+ for _, chans := range kw.subscribers {
+ for _, ch := range chans {
+ close(ch)
+ internalredis.KeyWatchers.Dec()
+ }
+ }
+ kw.subscribers = nil
+ }()
+
+ for {
+ msg, err := kw.conn.Receive(ctx)
+ if err != nil {
+ log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", err)).Error()
+ return nil
+ }
+
+ switch msg := msg.(type) {
+ case *redis.Subscription:
+ internalredis.RedisSubscriptions.Set(float64(msg.Count))
+ case *redis.Pong:
+ // Ignore.
+ case *redis.Message:
+ internalredis.TotalMessages.Inc()
+ internalredis.ReceivedBytes.Add(float64(len(msg.Payload)))
+ if strings.HasPrefix(msg.Channel, channelPrefix) {
+ kw.notifySubscribers(msg.Channel[len(channelPrefix):], string(msg.Payload))
+ }
+ default:
+ log.WithError(fmt.Errorf("keywatcher: unknown: %T", msg)).Error()
+ return nil
+ }
+ }
+}
+
+func (kw *KeyWatcher) Process(client *redis.Client) {
+ log.Info("keywatcher: starting process loop")
+
+ ctx := context.Background() // lint:allow context.Background
+ kw.mu.Lock()
+ kw.redisConn = client
+ kw.mu.Unlock()
+
+ for {
+ pubsub := client.Subscribe(ctx, []string{}...)
+ if err := pubsub.Ping(ctx); err != nil {
+ log.WithError(fmt.Errorf("keywatcher: %v", err)).Error()
+ time.Sleep(kw.reconnectBackoff.Duration())
+ continue
+ }
+
+ kw.reconnectBackoff.Reset()
+
+ if err := kw.receivePubSubStream(ctx, pubsub); err != nil {
+ log.WithError(fmt.Errorf("keywatcher: receivePubSubStream: %v", err)).Error()
+ }
+ }
+}
+
+func (kw *KeyWatcher) Shutdown() {
+ log.Info("keywatcher: shutting down")
+
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ select {
+ case <-kw.shutdown:
+ // already closed
+ default:
+ close(kw.shutdown)
+ }
+}
+
+func (kw *KeyWatcher) notifySubscribers(key, value string) {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ chanList, ok := kw.subscribers[key]
+ if !ok {
+ countAction("drop-message")
+ return
+ }
+
+ countAction("deliver-message")
+ for _, c := range chanList {
+ select {
+ case c <- value:
+ default:
+ }
+ }
+}
+
+func (kw *KeyWatcher) addSubscription(ctx context.Context, key string, notify chan string) error {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ if kw.conn == nil {
+ // This can happen because CI long polling is disabled in this Workhorse
+ // process. It can also be that we are waiting for the pubsub connection
+ // to be established. Either way it is OK to fail fast.
+ return errors.New("no redis connection")
+ }
+
+ if len(kw.subscribers[key]) == 0 {
+ countAction("create-subscription")
+ if err := kw.conn.Subscribe(ctx, channelPrefix+key); err != nil {
+ return err
+ }
+ }
+
+ if kw.subscribers == nil {
+ kw.subscribers = make(map[string][]chan string)
+ }
+ kw.subscribers[key] = append(kw.subscribers[key], notify)
+ internalredis.KeyWatchers.Inc()
+
+ return nil
+}
+
+func (kw *KeyWatcher) delSubscription(ctx context.Context, key string, notify chan string) {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+
+ chans, ok := kw.subscribers[key]
+ if !ok {
+ // This can happen if the pubsub connection dropped while we were
+ // waiting.
+ return
+ }
+
+ for i, c := range chans {
+ if notify == c {
+ kw.subscribers[key] = append(chans[:i], chans[i+1:]...)
+ internalredis.KeyWatchers.Dec()
+ break
+ }
+ }
+ if len(kw.subscribers[key]) == 0 {
+ delete(kw.subscribers, key)
+ countAction("delete-subscription")
+ if kw.conn != nil {
+ kw.conn.Unsubscribe(ctx, channelPrefix+key)
+ }
+ }
+}
+
+func (kw *KeyWatcher) WatchKey(ctx context.Context, key, value string, timeout time.Duration) (internalredis.WatchKeyStatus, error) {
+ notify := make(chan string, 1)
+ if err := kw.addSubscription(ctx, key, notify); err != nil {
+ return internalredis.WatchKeyStatusNoChange, err
+ }
+ defer kw.delSubscription(ctx, key, notify)
+
+ currentValue, err := kw.redisConn.Get(ctx, key).Result()
+ if errors.Is(err, redis.Nil) {
+ currentValue = ""
+ } else if err != nil {
+ return internalredis.WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET: %v", err)
+ }
+ if currentValue != value {
+ return internalredis.WatchKeyStatusAlreadyChanged, nil
+ }
+
+ select {
+ case <-kw.shutdown:
+ log.WithFields(log.Fields{"key": key}).Info("stopping watch due to shutdown")
+ return internalredis.WatchKeyStatusNoChange, nil
+ case currentValue := <-notify:
+ if currentValue == "" {
+ return internalredis.WatchKeyStatusNoChange, fmt.Errorf("keywatcher: redis GET failed")
+ }
+ if currentValue == value {
+ return internalredis.WatchKeyStatusNoChange, nil
+ }
+ return internalredis.WatchKeyStatusSeenChange, nil
+ case <-time.After(timeout):
+ return internalredis.WatchKeyStatusTimeout, nil
+ }
+}
diff --git a/workhorse/internal/goredis/keywatcher_test.go b/workhorse/internal/goredis/keywatcher_test.go
new file mode 100644
index 00000000000..b64262dc9c8
--- /dev/null
+++ b/workhorse/internal/goredis/keywatcher_test.go
@@ -0,0 +1,301 @@
+package goredis
+
+import (
+ "context"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/config"
+ "gitlab.com/gitlab-org/gitlab/workhorse/internal/redis"
+)
+
+var ctx = context.Background()
+
+const (
+ runnerKey = "runner:build_queue:10"
+)
+
+func initRdb() {
+ buf, _ := os.ReadFile("../../config.toml")
+ cfg, _ := config.LoadConfig(string(buf))
+ Configure(cfg.Redis)
+}
+
+func (kw *KeyWatcher) countSubscribers(key string) int {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ return len(kw.subscribers[key])
+}
+
+// Forces a run of the `Process` loop against a mock PubSubConn.
+func (kw *KeyWatcher) processMessages(t *testing.T, numWatchers int, value string, ready chan<- struct{}, wg *sync.WaitGroup) {
+ kw.mu.Lock()
+ kw.redisConn = rdb
+ psc := kw.redisConn.Subscribe(ctx, []string{}...)
+ kw.mu.Unlock()
+
+ errC := make(chan error)
+ go func() { errC <- kw.receivePubSubStream(ctx, psc) }()
+
+ require.Eventually(t, func() bool {
+ kw.mu.Lock()
+ defer kw.mu.Unlock()
+ return kw.conn != nil
+ }, time.Second, time.Millisecond)
+ close(ready)
+
+ require.Eventually(t, func() bool {
+ return kw.countSubscribers(runnerKey) == numWatchers
+ }, time.Second, time.Millisecond)
+
+ // send message after listeners are ready
+ kw.redisConn.Publish(ctx, channelPrefix+runnerKey, value)
+
+ // close subscription after all workers are done
+ wg.Wait()
+ kw.mu.Lock()
+ kw.conn.Close()
+ kw.mu.Unlock()
+
+ require.NoError(t, <-errC)
+}
+
+type keyChangeTestCase struct {
+ desc string
+ returnValue string
+ isKeyMissing bool
+ watchValue string
+ processedValue string
+ expectedStatus redis.WatchKeyStatus
+ timeout time.Duration
+}
+
+func TestKeyChangesInstantReturn(t *testing.T) {
+ initRdb()
+
+ testCases := []keyChangeTestCase{
+ // WatchKeyStatusAlreadyChanged
+ {
+ desc: "sees change with key existing and changed",
+ returnValue: "somethingelse",
+ watchValue: "something",
+ expectedStatus: redis.WatchKeyStatusAlreadyChanged,
+ timeout: time.Second,
+ },
+ {
+ desc: "sees change with key non-existing",
+ isKeyMissing: true,
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: redis.WatchKeyStatusAlreadyChanged,
+ timeout: time.Second,
+ },
+ // WatchKeyStatusTimeout
+ {
+ desc: "sees timeout with key existing and unchanged",
+ returnValue: "something",
+ watchValue: "something",
+ expectedStatus: redis.WatchKeyStatusTimeout,
+ timeout: time.Millisecond,
+ },
+ {
+ desc: "sees timeout with key non-existing and unchanged",
+ isKeyMissing: true,
+ watchValue: "",
+ expectedStatus: redis.WatchKeyStatusTimeout,
+ timeout: time.Millisecond,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+
+ // setup
+ if !tc.isKeyMissing {
+ rdb.Set(ctx, runnerKey, tc.returnValue, 0)
+ }
+
+ defer func() {
+ rdb.FlushDB(ctx)
+ }()
+
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
+ kw.redisConn = rdb
+ kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
+
+ val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, tc.timeout)
+
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
+ })
+ }
+}
+
+func TestKeyChangesWhenWatching(t *testing.T) {
+ initRdb()
+
+ testCases := []keyChangeTestCase{
+ // WatchKeyStatusSeenChange
+ {
+ desc: "sees change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: redis.WatchKeyStatusSeenChange,
+ },
+ {
+ desc: "sees change with key non-existing, when watching empty value",
+ isKeyMissing: true,
+ watchValue: "",
+ processedValue: "something",
+ expectedStatus: redis.WatchKeyStatusSeenChange,
+ },
+ // WatchKeyStatusNoChange
+ {
+ desc: "sees no change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "something",
+ expectedStatus: redis.WatchKeyStatusNoChange,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ if !tc.isKeyMissing {
+ rdb.Set(ctx, runnerKey, tc.returnValue, 0)
+ }
+
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
+ defer func() {
+ rdb.FlushDB(ctx)
+ }()
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ ready := make(chan struct{})
+
+ go func() {
+ defer wg.Done()
+ <-ready
+ val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second)
+
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
+ }()
+
+ kw.processMessages(t, 1, tc.processedValue, ready, wg)
+ })
+ }
+}
+
+func TestKeyChangesParallel(t *testing.T) {
+ initRdb()
+
+ testCases := []keyChangeTestCase{
+ {
+ desc: "massively parallel, sees change with key existing",
+ returnValue: "something",
+ watchValue: "something",
+ processedValue: "somethingelse",
+ expectedStatus: redis.WatchKeyStatusSeenChange,
+ },
+ {
+ desc: "massively parallel, sees change with key existing, watching missing keys",
+ isKeyMissing: true,
+ watchValue: "",
+ processedValue: "somethingelse",
+ expectedStatus: redis.WatchKeyStatusSeenChange,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ runTimes := 100
+
+ if !tc.isKeyMissing {
+ rdb.Set(ctx, runnerKey, tc.returnValue, 0)
+ }
+
+ defer func() {
+ rdb.FlushDB(ctx)
+ }()
+
+ wg := &sync.WaitGroup{}
+ wg.Add(runTimes)
+ ready := make(chan struct{})
+
+ kw := NewKeyWatcher()
+ defer kw.Shutdown()
+
+ for i := 0; i < runTimes; i++ {
+ go func() {
+ defer wg.Done()
+ <-ready
+ val, err := kw.WatchKey(ctx, runnerKey, tc.watchValue, time.Second)
+
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, tc.expectedStatus, val, "Expected value")
+ }()
+ }
+
+ kw.processMessages(t, runTimes, tc.processedValue, ready, wg)
+ })
+ }
+}
+
+func TestShutdown(t *testing.T) {
+ initRdb()
+
+ kw := NewKeyWatcher()
+ kw.redisConn = rdb
+ kw.conn = kw.redisConn.Subscribe(ctx, []string{}...)
+ defer kw.Shutdown()
+
+ rdb.Set(ctx, runnerKey, "something", 0)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(2)
+
+ go func() {
+ defer wg.Done()
+ val, err := kw.WatchKey(ctx, runnerKey, "something", 10*time.Second)
+
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, redis.WatchKeyStatusNoChange, val, "Expected value not to change")
+ }()
+
+ go func() {
+ defer wg.Done()
+ require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 1 }, 10*time.Second, time.Millisecond)
+
+ kw.Shutdown()
+ }()
+
+ wg.Wait()
+
+ require.Eventually(t, func() bool { return kw.countSubscribers(runnerKey) == 0 }, 10*time.Second, time.Millisecond)
+
+ // Adding a key after the shutdown should result in an immediate response
+ var val redis.WatchKeyStatus
+ var err error
+ done := make(chan struct{})
+ go func() {
+ val, err = kw.WatchKey(ctx, runnerKey, "something", 10*time.Second)
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ require.NoError(t, err, "Expected no error")
+ require.Equal(t, redis.WatchKeyStatusNoChange, val, "Expected value not to change")
+ case <-time.After(100 * time.Millisecond):
+ t.Fatal("timeout waiting for WatchKey")
+ }
+}
diff --git a/workhorse/internal/redis/keywatcher.go b/workhorse/internal/redis/keywatcher.go
index 2fd0753c3c9..8f1772a9195 100644
--- a/workhorse/internal/redis/keywatcher.go
+++ b/workhorse/internal/redis/keywatcher.go
@@ -37,32 +37,32 @@ func NewKeyWatcher() *KeyWatcher {
}
var (
- keyWatchers = promauto.NewGauge(
+ KeyWatchers = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_keywatcher_keywatchers",
Help: "The number of keys that is being watched by gitlab-workhorse",
},
)
- redisSubscriptions = promauto.NewGauge(
+ RedisSubscriptions = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "gitlab_workhorse_keywatcher_redis_subscriptions",
Help: "Current number of keywatcher Redis pubsub subscriptions",
},
)
- totalMessages = promauto.NewCounter(
+ TotalMessages = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_keywatcher_total_messages",
Help: "How many messages gitlab-workhorse has received in total on pubsub.",
},
)
- totalActions = promauto.NewCounterVec(
+ TotalActions = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_keywatcher_actions_total",
Help: "Counts of various keywatcher actions",
},
[]string{"action"},
)
- receivedBytes = promauto.NewCounter(
+ ReceivedBytes = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_keywatcher_received_bytes_total",
Help: "How many bytes of messages gitlab-workhorse has received in total on pubsub.",
@@ -72,7 +72,7 @@ var (
const channelPrefix = "workhorse:notifications:"
-func countAction(action string) { totalActions.WithLabelValues(action).Add(1) }
+func countAction(action string) { TotalActions.WithLabelValues(action).Add(1) }
func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
kw.mu.Lock()
@@ -93,7 +93,7 @@ func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
for _, chans := range kw.subscribers {
for _, ch := range chans {
close(ch)
- keyWatchers.Dec()
+ KeyWatchers.Dec()
}
}
kw.subscribers = nil
@@ -102,13 +102,13 @@ func (kw *KeyWatcher) receivePubSubStream(conn redis.Conn) error {
for {
switch v := kw.conn.Receive().(type) {
case redis.Message:
- totalMessages.Inc()
- receivedBytes.Add(float64(len(v.Data)))
+ TotalMessages.Inc()
+ ReceivedBytes.Add(float64(len(v.Data)))
if strings.HasPrefix(v.Channel, channelPrefix) {
kw.notifySubscribers(v.Channel[len(channelPrefix):], string(v.Data))
}
case redis.Subscription:
- redisSubscriptions.Set(float64(v.Count))
+ RedisSubscriptions.Set(float64(v.Count))
case error:
log.WithError(fmt.Errorf("keywatcher: pubsub receive: %v", v)).Error()
// Intermittent error, return nil so that it doesn't wait before reconnect
@@ -205,7 +205,7 @@ func (kw *KeyWatcher) addSubscription(key string, notify chan string) error {
kw.subscribers = make(map[string][]chan string)
}
kw.subscribers[key] = append(kw.subscribers[key], notify)
- keyWatchers.Inc()
+ KeyWatchers.Inc()
return nil
}
@@ -224,7 +224,7 @@ func (kw *KeyWatcher) delSubscription(key string, notify chan string) {
for i, c := range chans {
if notify == c {
kw.subscribers[key] = append(chans[:i], chans[i+1:]...)
- keyWatchers.Dec()
+ KeyWatchers.Dec()
break
}
}
diff --git a/workhorse/internal/redis/redis.go b/workhorse/internal/redis/redis.go
index 03118cfcef6..c79e1e56b3a 100644
--- a/workhorse/internal/redis/redis.go
+++ b/workhorse/internal/redis/redis.go
@@ -45,14 +45,14 @@ const (
)
var (
- totalConnections = promauto.NewCounter(
+ TotalConnections = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gitlab_workhorse_redis_total_connections",
Help: "How many connections gitlab-workhorse has opened in total. Can be used to track Redis connection rate for this process",
},
)
- errorCounter = promauto.NewCounterVec(
+ ErrorCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_workhorse_redis_errors",
Help: "Counts different types of Redis errors encountered by workhorse, by type and destination (redis, sentinel)",
@@ -100,7 +100,7 @@ func sentinelConn(master string, urls []config.TomlURL) *sentinel.Sentinel {
}
if err != nil {
- errorCounter.WithLabelValues("dial", "sentinel").Inc()
+ ErrorCounter.WithLabelValues("dial", "sentinel").Inc()
return nil, err
}
return c, nil
@@ -159,7 +159,7 @@ func sentinelDialer(dopts []redis.DialOption) redisDialerFunc {
return func() (redis.Conn, error) {
address, err := sntnl.MasterAddr()
if err != nil {
- errorCounter.WithLabelValues("master", "sentinel").Inc()
+ ErrorCounter.WithLabelValues("master", "sentinel").Inc()
return nil, err
}
dopts = append(dopts, redis.DialNetDial(keepAliveDialer))
@@ -214,9 +214,9 @@ func countDialer(dialer redisDialerFunc) redisDialerFunc {
return func() (redis.Conn, error) {
c, err := dialer()
if err != nil {
- errorCounter.WithLabelValues("dial", "redis").Inc()
+ ErrorCounter.WithLabelValues("dial", "redis").Inc()
} else {
- totalConnections.Inc()
+ TotalConnections.Inc()
}
return c, err
}