diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-02-21 20:36:36 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-02-21 20:36:36 +0300 |
commit | 1c364e717ce8fdc4a6603f7369756785c146c898 (patch) | |
tree | 26ee5d27d5658c74f0c2097cc1e271a140315b04 | |
parent | 69984aee4bae5d28182946aba78737e0acbc635c (diff) | |
parent | 8e5cb64195e732fd95f98d49971b6e9910b86ce2 (diff) |
Merge branch 'pks-ssh-receive-pack-with-sidechannel-for-internal-fetches' into 'master'
git: Convert internal fetches to use sidechannel
Closes #4042
See merge request gitlab-org/gitaly!4358
-rw-r--r-- | internal/git/command_options.go | 21 | ||||
-rw-r--r-- | internal/git/command_options_test.go | 62 | ||||
-rw-r--r-- | internal/git/localrepo/remote.go | 25 | ||||
-rw-r--r-- | internal/git/localrepo/remote_extra_test.go | 7 | ||||
-rw-r--r-- | internal/gitaly/service/conflicts/resolve_conflicts_test.go | 13 | ||||
-rw-r--r-- | internal/gitaly/service/operations/rebase_test.go | 6 | ||||
-rw-r--r-- | internal/gitaly/service/operations/revert_test.go | 7 | ||||
-rw-r--r-- | internal/gitaly/service/repository/fetch_test.go | 7 | ||||
-rw-r--r-- | internal/gitaly/service/repository/replicate_test.go | 26 | ||||
-rw-r--r-- | internal/metadata/featureflag/ff_fetch_internal_with_sidechannel.go | 5 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 20 |
11 files changed, 166 insertions, 33 deletions
diff --git a/internal/git/command_options.go b/internal/git/command_options.go index ee63da89f..6b9f8dbb7 100644 --- a/internal/git/command_options.go +++ b/internal/git/command_options.go @@ -17,6 +17,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) const ( @@ -236,6 +237,22 @@ func WithGlobalOption(opts ...GlobalOption) CmdOpt { // WithInternalFetch returns an option which sets up git-fetch(1) to fetch from another internal // Gitaly node. func WithInternalFetch(req *gitalypb.SSHUploadPackRequest) CmdOpt { + return withInternalFetch(req, false) +} + +// WithInternalFetchWithSidechannel returns an option which sets up git-fetch(1) to fetch from +// another internal Gitaly node. In contrast to WithInternalFetch, this will call +// SSHUploadPackWithSidechannel instead of SSHUploadPack. +func WithInternalFetchWithSidechannel(req *gitalypb.SSHUploadPackWithSidechannelRequest) CmdOpt { + return withInternalFetch(req, true) +} + +type repoScopedRequest interface { + proto.Message + GetRepository() *gitalypb.Repository +} + +func withInternalFetch(req repoScopedRequest, withSidechannel bool) func(ctx context.Context, cfg config.Cfg, _ CommandFactory, c *cmdCfg) error { return func(ctx context.Context, cfg config.Cfg, _ CommandFactory, c *cmdCfg) error { payload, err := protojson.Marshal(req) if err != nil { @@ -273,6 +290,10 @@ func WithInternalFetch(req *gitalypb.SSHUploadPackRequest) CmdOpt { fmt.Sprintf("%s=%s", x509.SSLCertFile, os.Getenv(x509.SSLCertFile)), ) + if withSidechannel { + c.env = append(c.env, "GITALY_USE_SIDECHANNEL=1") + } + return nil } } diff --git a/internal/git/command_options_test.go b/internal/git/command_options_test.go index 693052211..9a8dfa1d8 100644 --- a/internal/git/command_options_test.go +++ b/internal/git/command_options_test.go @@ -405,23 +405,63 @@ func TestWithInternalFetch(t *testing.T) { ctx = metadata.NewIncomingContext(ctx, md) ctx = correlation.ContextWithCorrelation(ctx, "correlation-id-1") - req := gitalypb.SSHUploadPackRequest{ + uploadPackRequest := gitalypb.SSHUploadPackRequest{ Repository: &gitalypb.Repository{ StorageName: "default", }, } + uploadPackRequestMarshalled, err := protojson.Marshal(&uploadPackRequest) + require.NoError(t, err) - expectedPayload, err := protojson.Marshal(&req) + uploadPackRequestWithSidechannel := gitalypb.SSHUploadPackWithSidechannelRequest{ + Repository: &gitalypb.Repository{ + StorageName: "default", + }, + } + uploadPackRequestWithSidechannelMarshalled, err := protojson.Marshal(&uploadPackRequestWithSidechannel) require.NoError(t, err) - var commandCfg cmdCfg - option := WithInternalFetch(&req) - require.NoError(t, option(ctx, cfg, gitCmdFactory, &commandCfg)) + for _, tc := range []struct { + desc string + createOption func() CmdOpt + expectedSidechannel bool + expectedPayload []byte + }{ + { + desc: "without sidechannel", + createOption: func() CmdOpt { + return WithInternalFetch(&uploadPackRequest) + }, + expectedSidechannel: false, + expectedPayload: uploadPackRequestMarshalled, + }, + { + desc: "with sidechannel", + createOption: func() CmdOpt { + return WithInternalFetchWithSidechannel(&uploadPackRequestWithSidechannel) + }, + expectedSidechannel: true, + expectedPayload: uploadPackRequestWithSidechannelMarshalled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var commandCfg cmdCfg + + option := tc.createOption() + require.NoError(t, option(ctx, cfg, gitCmdFactory, &commandCfg)) - require.Subset(t, commandCfg.env, []string{ - fmt.Sprintf("GIT_SSH_COMMAND=%s upload-pack", filepath.Join(cfg.BinDir, "gitaly-ssh")), - fmt.Sprintf("GITALY_PAYLOAD=%s", expectedPayload), - "CORRELATION_ID=correlation-id-1", - "GIT_SSH_VARIANT=simple", - }) + require.Subset(t, commandCfg.env, []string{ + fmt.Sprintf("GIT_SSH_COMMAND=%s upload-pack", filepath.Join(cfg.BinDir, "gitaly-ssh")), + fmt.Sprintf("GITALY_PAYLOAD=%s", tc.expectedPayload), + "CORRELATION_ID=correlation-id-1", + "GIT_SSH_VARIANT=simple", + }) + + if tc.expectedSidechannel { + require.Contains(t, commandCfg.env, "GITALY_USE_SIDECHANNEL=1") + } else { + require.NotContains(t, commandCfg.env, "GITALY_USE_SIDECHANNEL=1") + } + }) + } } diff --git a/internal/git/localrepo/remote.go b/internal/git/localrepo/remote.go index 1e0ff9b3e..823372a3f 100644 --- a/internal/git/localrepo/remote.go +++ b/internal/git/localrepo/remote.go @@ -9,6 +9,7 @@ import ( "strings" "gitlab.com/gitlab-org/gitaly/v14/internal/git" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -126,11 +127,6 @@ func (repo *Repo) FetchInternal( } commandOptions := []git.CmdOpt{ - git.WithInternalFetch(&gitalypb.SSHUploadPackRequest{ - Repository: remoteRepo, - GitConfigOptions: []string{"uploadpack.allowAnySHA1InWant=true"}, - GitProtocol: git.ProtocolV2, - }), git.WithEnv(opts.Env...), git.WithStderr(opts.Stderr), // We've observed performance issues when fetching into big repositories part of an @@ -142,6 +138,25 @@ func (repo *Repo) FetchInternal( // matter in the connectivity check either. git.WithConfig(git.ConfigPair{Key: "core.alternateRefsCommand", Value: "exit 0 #"}), } + + if featureflag.FetchInternalWithSidechannel.IsEnabled(ctx) { + commandOptions = append(commandOptions, git.WithInternalFetchWithSidechannel( + &gitalypb.SSHUploadPackWithSidechannelRequest{ + Repository: remoteRepo, + GitConfigOptions: []string{"uploadpack.allowAnySHA1InWant=true"}, + GitProtocol: git.ProtocolV2, + }, + )) + } else { + commandOptions = append(commandOptions, git.WithInternalFetch( + &gitalypb.SSHUploadPackRequest{ + Repository: remoteRepo, + GitConfigOptions: []string{"uploadpack.allowAnySHA1InWant=true"}, + GitProtocol: git.ProtocolV2, + }, + )) + } + if opts.DisableTransactions { commandOptions = append(commandOptions, git.WithDisabledHooks()) } else { diff --git a/internal/git/localrepo/remote_extra_test.go b/internal/git/localrepo/remote_extra_test.go index 57d2ac6c6..834fcd261 100644 --- a/internal/git/localrepo/remote_extra_test.go +++ b/internal/git/localrepo/remote_extra_test.go @@ -2,6 +2,7 @@ package localrepo_test import ( "bytes" + "context" "path/filepath" "testing" @@ -13,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/hook" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/ssh" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -21,8 +23,11 @@ import ( ) func TestRepo_FetchInternal(t *testing.T) { - ctx := testhelper.Context(t) + t.Parallel() + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testRepoFetchInternal) +} +func testRepoFetchInternal(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) gitCmdFactory, readGitProtocol := gittest.NewProtocolDetectingCommandFactory(ctx, t, cfg) diff --git a/internal/gitaly/service/conflicts/resolve_conflicts_test.go b/internal/gitaly/service/conflicts/resolve_conflicts_test.go index 04a409aa0..6c29067ac 100644 --- a/internal/gitaly/service/conflicts/resolve_conflicts_test.go +++ b/internal/gitaly/service/conflicts/resolve_conflicts_test.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -197,7 +198,12 @@ func TestSuccessfulResolveConflictsRequestHelper(t *testing.T) { } func TestResolveConflictsWithRemoteRepo(t *testing.T) { - ctx := testhelper.Context(t) + t.Parallel() + + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testResolveConflictsWithRemoteRepo) +} + +func testResolveConflictsWithRemoteRepo(t *testing.T, ctx context.Context) { hookManager := hook.NewMockManager(t, hook.NopPreReceive, hook.NopPostReceive, hook.NopUpdate, hook.NopReferenceTransaction) cfg, sourceRepo, sourceRepoPath, client := SetupConflictsService(ctx, t, true, hookManager) @@ -811,7 +817,10 @@ func TestFailedResolveConflictsRequestDueToValidation(t *testing.T) { } func TestResolveConflictsQuarantine(t *testing.T) { - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testResolveConflictsQuarantine) +} + +func testResolveConflictsQuarantine(t *testing.T, ctx context.Context) { cfg, sourceRepoProto, sourceRepoPath, client := SetupConflictsService(ctx, t, true, nil) testcfg.BuildGitalySSH(t, cfg) diff --git a/internal/gitaly/service/operations/rebase_test.go b/internal/gitaly/service/operations/rebase_test.go index 3574bda5b..901db8214 100644 --- a/internal/gitaly/service/operations/rebase_test.go +++ b/internal/gitaly/service/operations/rebase_test.go @@ -1,6 +1,7 @@ package operations import ( + "context" "fmt" "io" "path/filepath" @@ -15,6 +16,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" @@ -654,8 +656,10 @@ func TestRebaseRequestWithDeletedFile(t *testing.T) { func TestRebaseOntoRemoteBranch(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testRebaseOntoRemoteBranch) +} +func testRebaseOntoRemoteBranch(t *testing.T, ctx context.Context) { ctx, cfg, repoProto, repoPath, client := setupOperationsService(t, ctx) repo := localrepo.NewTestRepo(t, cfg, repoProto) diff --git a/internal/gitaly/service/operations/revert_test.go b/internal/gitaly/service/operations/revert_test.go index 0305d6534..27a6cda0d 100644 --- a/internal/gitaly/service/operations/revert_test.go +++ b/internal/gitaly/service/operations/revert_test.go @@ -1,6 +1,7 @@ package operations import ( + "context" "fmt" "path/filepath" "testing" @@ -10,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc/codes" @@ -272,8 +274,11 @@ func TestServer_UserRevert_stableID(t *testing.T) { func TestServer_UserRevert_successfulIntoEmptyRepo(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testServerUserRevertSuccessfulIntoEmptyRepo) +} + +func testServerUserRevertSuccessfulIntoEmptyRepo(t *testing.T, ctx context.Context) { ctx, cfg, startRepoProto, _, client := setupOperationsService(t, ctx) startRepo := localrepo.NewTestRepo(t, cfg, startRepoProto) diff --git a/internal/gitaly/service/repository/fetch_test.go b/internal/gitaly/service/repository/fetch_test.go index 360ced75d..30c7b14af 100644 --- a/internal/gitaly/service/repository/fetch_test.go +++ b/internal/gitaly/service/repository/fetch_test.go @@ -1,12 +1,14 @@ package repository import ( + "context" "testing" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -16,7 +18,10 @@ import ( func TestFetchSourceBranchSourceRepositorySuccess(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testFetchSourceBranchSourceRepositorySuccess) +} + +func testFetchSourceBranchSourceRepositorySuccess(t *testing.T, ctx context.Context) { cfg, sourceRepo, sourcePath, client := setupRepositoryService(ctx, t) md := testcfg.GitalyServersMetadataFromCfg(t, cfg) diff --git a/internal/gitaly/service/repository/replicate_test.go b/internal/gitaly/service/repository/replicate_test.go index 5aa8e5216..a9b4eba44 100644 --- a/internal/gitaly/service/repository/replicate_test.go +++ b/internal/gitaly/service/repository/replicate_test.go @@ -27,6 +27,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/helper" "gitlab.com/gitlab-org/gitaly/v14/internal/helper/text" "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" @@ -39,8 +40,10 @@ import ( func TestReplicateRepository(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testReplicateRepository) +} +func testReplicateRepository(t *testing.T, ctx context.Context) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) cfg := cfgBuilder.Build(t) @@ -112,8 +115,10 @@ func TestReplicateRepository(t *testing.T) { func TestReplicateRepositoryTransactional(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testReplicateRepositoryTransactional) +} +func testReplicateRepositoryTransactional(t *testing.T, ctx context.Context) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "replica")) cfg := cfgBuilder.Build(t) @@ -264,8 +269,10 @@ func TestReplicateRepositoryInvalidArguments(t *testing.T) { func TestReplicateRepository_BadRepository(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testReplicateRepositoryBadRepository) +} +func testReplicateRepositoryBadRepository(t *testing.T, ctx context.Context) { for _, tc := range []struct { desc string invalidSource bool @@ -347,8 +354,10 @@ func TestReplicateRepository_BadRepository(t *testing.T) { func TestReplicateRepository_FailedFetchInternalRemote(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testReplicateRepositoryFailedFetchInternalRemote) +} +func testReplicateRepositoryFailedFetchInternalRemote(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t, testcfg.WithStorages("default", "replica")) testcfg.BuildGitalyHooks(t, cfg) testcfg.BuildGitalySSH(t, cfg) @@ -428,9 +437,10 @@ func listenGitalySSHCalls(t *testing.T, conf config.Cfg) func() gitalySSHParams func TestFetchInternalRemote_successful(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testFetchInternalRemoteSuccessful) +} - ctx := testhelper.Context(t) - +func testFetchInternalRemoteSuccessful(t *testing.T, ctx context.Context) { remoteCfg, remoteRepo, remoteRepoPath := testcfg.BuildWithRepo(t) testcfg.BuildGitalyHooks(t, remoteCfg) gittest.WriteCommit(t, remoteCfg, remoteRepoPath, gittest.WithBranch("master")) @@ -516,10 +526,12 @@ func TestFetchInternalRemote_successful(t *testing.T) { func TestFetchInternalRemote_failure(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testFetchInternalRemoteFailure) +} +func testFetchInternalRemoteFailure(t *testing.T, ctx context.Context) { cfg, repoProto, _ := testcfg.BuildWithRepo(t) repo := localrepo.NewTestRepo(t, cfg, repoProto) - ctx := testhelper.Context(t) ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) connsPool := client.NewPool() diff --git a/internal/metadata/featureflag/ff_fetch_internal_with_sidechannel.go b/internal/metadata/featureflag/ff_fetch_internal_with_sidechannel.go new file mode 100644 index 000000000..726aeb0ea --- /dev/null +++ b/internal/metadata/featureflag/ff_fetch_internal_with_sidechannel.go @@ -0,0 +1,5 @@ +package featureflag + +// FetchInternalWithSidechannel enables the use of SSHUploadPackWithSidechannel for internal +// fetches. +var FetchInternalWithSidechannel = NewFeatureFlag("fetch_internal_with_sidechannel", false) diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index f49b9edd9..7cd48ff3a 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -26,6 +26,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" @@ -51,6 +52,10 @@ func TestMain(m *testing.M) { func TestReplMgr_ProcessBacklog(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testReplMgrProcessBacklog) +} + +func testReplMgrProcessBacklog(t *testing.T, ctx context.Context) { primaryCfg, testRepoProto, testRepoPath := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) testRepo := localrepo.NewTestRepo(t, primaryCfg, testRepoProto) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -94,7 +99,6 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { require.NoError(t, err) poolCtx := testhelper.Context(t) - require.NoError(t, pool.Create(poolCtx, testRepo)) require.NoError(t, pool.Link(poolCtx, testRepo)) @@ -102,7 +106,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) { poolRepository := pool.ToProto().GetRepository() targetObjectPoolRepo := proto.Clone(poolRepository).(*gitalypb.Repository) targetObjectPoolRepo.StorageName = backupCfg.Storages[0].Name - ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx, cancel := context.WithCancel(ctx) injectedCtx := metadata.NewOutgoingContext(ctx, testcfg.GitalyServersMetadataFromCfg(t, primaryCfg)) @@ -677,6 +681,10 @@ func getChecksumFunc(ctx context.Context, client gitalypb.RepositoryServiceClien func TestProcessBacklog_FailedJobs(t *testing.T) { t.Parallel() + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testProcessBacklogFailedJobs) +} + +func testProcessBacklogFailedJobs(t *testing.T, ctx context.Context) { primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("default")) primaryAddr := testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) @@ -706,7 +714,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { }, }, } - ctx, cancel := context.WithCancel(testhelper.Context(t)) + ctx, cancel := context.WithCancel(ctx) queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(testdb.New(t))) @@ -779,7 +787,11 @@ func TestProcessBacklog_FailedJobs(t *testing.T) { func TestProcessBacklog_Success(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(testhelper.Context(t)) + testhelper.NewFeatureSets(featureflag.FetchInternalWithSidechannel).Run(t, testProcessBacklogSuccess) +} + +func testProcessBacklogSuccess(t *testing.T, ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) primaryCfg, testRepo, _ := testcfg.BuildWithRepo(t, testcfg.WithStorages("primary")) primaryCfg.SocketPath = testserver.RunGitalyServer(t, primaryCfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) |