Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2020-09-03 18:18:56 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2020-09-03 18:18:56 +0300
commit0db5846e56b27f909a8890e5618d321919f3318c (patch)
treef36ca52adc6ca262561824e13f96f5b41440527f
parent68a400adf0238cb0a168c91dfff553f543fa2fb6 (diff)
parentbf3b318566aa7fc52f31f2be90037a618178a981 (diff)
Merge branch 'zj-info-ref-fix-backport-13.1' into '13-1-stable'
Backport 13.1: Fix hanging info/refs cache when error occurs See merge request gitlab-org/gitaly!2517
-rw-r--r--changelogs/unreleased/po-fix-inforef-stuck.yml5
-rw-r--r--internal/service/smarthttp/cache.go19
-rw-r--r--internal/service/smarthttp/inforefs_test.go44
3 files changed, 63 insertions, 5 deletions
diff --git a/changelogs/unreleased/po-fix-inforef-stuck.yml b/changelogs/unreleased/po-fix-inforef-stuck.yml
new file mode 100644
index 000000000..4aeb54d15
--- /dev/null
+++ b/changelogs/unreleased/po-fix-inforef-stuck.yml
@@ -0,0 +1,5 @@
+---
+title: Fix hanging info refs cache when error occurs
+merge_request: 2497
+author:
+type: fixed
diff --git a/internal/service/smarthttp/cache.go b/internal/service/smarthttp/cache.go
index bff4c96a8..bb953f78c 100644
--- a/internal/service/smarthttp/cache.go
+++ b/internal/service/smarthttp/cache.go
@@ -3,8 +3,10 @@ package smarthttp
import (
"context"
"io"
+ "io/ioutil"
"sync"
+ "github.com/golang/protobuf/proto"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
@@ -14,8 +16,15 @@ import (
"google.golang.org/grpc/status"
)
+// streamer abstracts away the cache concrete type so that it can be override
+// in tests
+type streamer interface {
+ GetStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (_ io.ReadCloser, err error)
+ PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error
+}
+
var (
- infoRefCache = cache.NewStreamDB(cache.LeaseKeyer{})
+ infoRefCache streamer = cache.NewStreamDB(cache.LeaseKeyer{})
// prometheus counters
cacheAttemptTotal = prometheus.NewCounter(
@@ -84,6 +93,14 @@ func tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest, w io.Writer, mi
tr := io.TeeReader(pr, w)
if err := infoRefCache.PutStream(ctx, in.Repository, in, tr); err != nil {
logger.Errorf("unable to store InfoRefsUploadPack response in cache: %q", err)
+
+ // discard remaining bytes if caching stream
+ // failed so that tee reader is not blocked
+ _, err = io.Copy(ioutil.Discard, tr)
+ if err != nil {
+ logger.WithError(err).
+ Error("unable to discard remaining InfoRefsUploadPack cache stream")
+ }
}
}()
diff --git a/internal/service/smarthttp/inforefs_test.go b/internal/service/smarthttp/inforefs_test.go
index 05bbc8649..f69fb7d76 100644
--- a/internal/service/smarthttp/inforefs_test.go
+++ b/internal/service/smarthttp/inforefs_test.go
@@ -3,6 +3,7 @@ package smarthttp
import (
"bytes"
"context"
+ "errors"
"fmt"
"io"
"io/ioutil"
@@ -10,7 +11,9 @@ import (
"path/filepath"
"strings"
"testing"
+ "time"
+ "github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/cache"
"gitlab.com/gitlab-org/gitaly/internal/config"
@@ -283,6 +286,18 @@ func assertGitRefAdvertisement(t *testing.T, rpc, responseBody string, firstLine
}
}
+type mockStreamer struct {
+ streamer
+ putStream func(context.Context, *gitalypb.Repository, proto.Message, io.Reader) error
+}
+
+func (ms mockStreamer) PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error {
+ if ms.putStream != nil {
+ return ms.putStream(ctx, repo, req, src)
+ }
+ return ms.streamer.PutStream(ctx, repo, req, src)
+}
+
func TestCacheInfoRefsUploadPack(t *testing.T) {
clearCache(t)
@@ -297,6 +312,9 @@ func TestCacheInfoRefsUploadPack(t *testing.T) {
ctx := context.Background()
assertNormalResponse := func() {
+ ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
+ defer cancel()
+
response, err := makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, rpcRequest)
require.NoError(t, err)
@@ -327,10 +345,13 @@ func TestCacheInfoRefsUploadPack(t *testing.T) {
replacedContents[0], replacedContents[3], replacedContents[1:3],
)
- // invalidate cache for repository
- ender, err := cache.LeaseKeyer{}.StartLease(rpcRequest.Repository)
- require.NoError(t, err)
- require.NoError(t, ender.EndLease(setInfoRefsUploadPackMethod(context.Background())))
+ invalidateCacheForRepo := func() {
+ ender, err := cache.LeaseKeyer{}.StartLease(rpcRequest.Repository)
+ require.NoError(t, err)
+ require.NoError(t, ender.EndLease(setInfoRefsUploadPackMethod(ctx)))
+ }
+
+ invalidateCacheForRepo()
// replaced cache response is no longer valid
assertNormalResponse()
@@ -348,6 +369,21 @@ func TestCacheInfoRefsUploadPack(t *testing.T) {
_, err = makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, invalidReq)
testhelper.RequireGrpcError(t, err, codes.Internal)
testhelper.AssertPathNotExists(t, pathToCachedResponse(t, ctx, invalidReq))
+
+ // if an error occurs while putting stream, it should not interrupt
+ // request from being served
+ happened := false
+ defer func(old streamer) { infoRefCache = old }(infoRefCache)
+ infoRefCache = mockStreamer{
+ streamer: infoRefCache,
+ putStream: func(context.Context, *gitalypb.Repository, proto.Message, io.Reader) error {
+ happened = true
+ return errors.New("oh nos!")
+ },
+ }
+ invalidateCacheForRepo()
+ assertNormalResponse()
+ require.True(t, happened)
}
func createInvalidRepo(t testing.TB, repo *gitalypb.Repository) func() {