diff options
author | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-09-03 18:18:56 +0300 |
---|---|---|
committer | Zeger-Jan van de Weg <git@zjvandeweg.nl> | 2020-09-03 18:18:56 +0300 |
commit | 0db5846e56b27f909a8890e5618d321919f3318c (patch) | |
tree | f36ca52adc6ca262561824e13f96f5b41440527f | |
parent | 68a400adf0238cb0a168c91dfff553f543fa2fb6 (diff) | |
parent | bf3b318566aa7fc52f31f2be90037a618178a981 (diff) |
Merge branch 'zj-info-ref-fix-backport-13.1' into '13-1-stable'
Backport 13.1: Fix hanging info/refs cache when error occurs
See merge request gitlab-org/gitaly!2517
-rw-r--r-- | changelogs/unreleased/po-fix-inforef-stuck.yml | 5 | ||||
-rw-r--r-- | internal/service/smarthttp/cache.go | 19 | ||||
-rw-r--r-- | internal/service/smarthttp/inforefs_test.go | 44 |
3 files changed, 63 insertions, 5 deletions
diff --git a/changelogs/unreleased/po-fix-inforef-stuck.yml b/changelogs/unreleased/po-fix-inforef-stuck.yml new file mode 100644 index 000000000..4aeb54d15 --- /dev/null +++ b/changelogs/unreleased/po-fix-inforef-stuck.yml @@ -0,0 +1,5 @@ +--- +title: Fix hanging info refs cache when error occurs +merge_request: 2497 +author: +type: fixed diff --git a/internal/service/smarthttp/cache.go b/internal/service/smarthttp/cache.go index bff4c96a8..bb953f78c 100644 --- a/internal/service/smarthttp/cache.go +++ b/internal/service/smarthttp/cache.go @@ -3,8 +3,10 @@ package smarthttp import ( "context" "io" + "io/ioutil" "sync" + "github.com/golang/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -14,8 +16,15 @@ import ( "google.golang.org/grpc/status" ) +// streamer abstracts away the cache concrete type so that it can be override +// in tests +type streamer interface { + GetStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message) (_ io.ReadCloser, err error) + PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error +} + var ( - infoRefCache = cache.NewStreamDB(cache.LeaseKeyer{}) + infoRefCache streamer = cache.NewStreamDB(cache.LeaseKeyer{}) // prometheus counters cacheAttemptTotal = prometheus.NewCounter( @@ -84,6 +93,14 @@ func tryCache(ctx context.Context, in *gitalypb.InfoRefsRequest, w io.Writer, mi tr := io.TeeReader(pr, w) if err := infoRefCache.PutStream(ctx, in.Repository, in, tr); err != nil { logger.Errorf("unable to store InfoRefsUploadPack response in cache: %q", err) + + // discard remaining bytes if caching stream + // failed so that tee reader is not blocked + _, err = io.Copy(ioutil.Discard, tr) + if err != nil { + logger.WithError(err). + Error("unable to discard remaining InfoRefsUploadPack cache stream") + } } }() diff --git a/internal/service/smarthttp/inforefs_test.go b/internal/service/smarthttp/inforefs_test.go index 05bbc8649..f69fb7d76 100644 --- a/internal/service/smarthttp/inforefs_test.go +++ b/internal/service/smarthttp/inforefs_test.go @@ -3,6 +3,7 @@ package smarthttp import ( "bytes" "context" + "errors" "fmt" "io" "io/ioutil" @@ -10,7 +11,9 @@ import ( "path/filepath" "strings" "testing" + "time" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/cache" "gitlab.com/gitlab-org/gitaly/internal/config" @@ -283,6 +286,18 @@ func assertGitRefAdvertisement(t *testing.T, rpc, responseBody string, firstLine } } +type mockStreamer struct { + streamer + putStream func(context.Context, *gitalypb.Repository, proto.Message, io.Reader) error +} + +func (ms mockStreamer) PutStream(ctx context.Context, repo *gitalypb.Repository, req proto.Message, src io.Reader) error { + if ms.putStream != nil { + return ms.putStream(ctx, repo, req, src) + } + return ms.streamer.PutStream(ctx, repo, req, src) +} + func TestCacheInfoRefsUploadPack(t *testing.T) { clearCache(t) @@ -297,6 +312,9 @@ func TestCacheInfoRefsUploadPack(t *testing.T) { ctx := context.Background() assertNormalResponse := func() { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + response, err := makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, rpcRequest) require.NoError(t, err) @@ -327,10 +345,13 @@ func TestCacheInfoRefsUploadPack(t *testing.T) { replacedContents[0], replacedContents[3], replacedContents[1:3], ) - // invalidate cache for repository - ender, err := cache.LeaseKeyer{}.StartLease(rpcRequest.Repository) - require.NoError(t, err) - require.NoError(t, ender.EndLease(setInfoRefsUploadPackMethod(context.Background()))) + invalidateCacheForRepo := func() { + ender, err := cache.LeaseKeyer{}.StartLease(rpcRequest.Repository) + require.NoError(t, err) + require.NoError(t, ender.EndLease(setInfoRefsUploadPackMethod(ctx))) + } + + invalidateCacheForRepo() // replaced cache response is no longer valid assertNormalResponse() @@ -348,6 +369,21 @@ func TestCacheInfoRefsUploadPack(t *testing.T) { _, err = makeInfoRefsUploadPackRequest(ctx, t, serverSocketPath, invalidReq) testhelper.RequireGrpcError(t, err, codes.Internal) testhelper.AssertPathNotExists(t, pathToCachedResponse(t, ctx, invalidReq)) + + // if an error occurs while putting stream, it should not interrupt + // request from being served + happened := false + defer func(old streamer) { infoRefCache = old }(infoRefCache) + infoRefCache = mockStreamer{ + streamer: infoRefCache, + putStream: func(context.Context, *gitalypb.Repository, proto.Message, io.Reader) error { + happened = true + return errors.New("oh nos!") + }, + } + invalidateCacheForRepo() + assertNormalResponse() + require.True(t, happened) } func createInvalidRepo(t testing.TB, repo *gitalypb.Repository) func() { |