Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2021-10-06 19:28:00 +0300
committerJacob Vosmaer <jacob@gitlab.com>2021-10-06 20:27:46 +0300
commit9afed4db259197170992383d7710445dfca4f098 (patch)
treef3f52be532668de0fd1ea098d305c60bd8d29670
parent5027043f38fc5eac159135647e6d4fb03495cbb4 (diff)
Praefect: proxy sidechannels
This commit adds backchannel support to the main gRPC listener of Praefect. And if clients make gRPC calls with sidechannels, Praefect will now proxy these to the Gitaly backend. Changelog: added
-rw-r--r--cmd/praefect/main.go8
-rw-r--r--cmd/praefect/subcmd_track_repository_test.go2
-rw-r--r--internal/praefect/auth_test.go4
-rw-r--r--internal/praefect/coordinator_pg_test.go1
-rw-r--r--internal/praefect/coordinator_test.go14
-rw-r--r--internal/praefect/helper_test.go3
-rw-r--r--internal/praefect/info_service_test.go2
-rw-r--r--internal/praefect/node.go4
-rw-r--r--internal/praefect/node_test.go2
-rw-r--r--internal/praefect/nodes/manager.go14
-rw-r--r--internal/praefect/nodes/manager_test.go10
-rw-r--r--internal/praefect/nodes/sql_elector_test.go1
-rw-r--r--internal/praefect/replicator_test.go10
-rw-r--r--internal/praefect/repocleaner/repository_test.go8
-rw-r--r--internal/praefect/repository_exists_test.go1
-rw-r--r--internal/praefect/server.go21
-rw-r--r--internal/praefect/server_factory.go10
-rw-r--r--internal/praefect/server_factory_test.go84
-rw-r--r--internal/praefect/server_test.go20
19 files changed, 166 insertions, 53 deletions
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 65328d49e..8b56cab28 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -87,6 +87,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/repocleaner"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/version"
"gitlab.com/gitlab-org/labkit/monitoring"
"gitlab.com/gitlab-org/labkit/tracing"
@@ -277,7 +278,8 @@ func run(cfgs []starter.Config, conf config.Config) error {
}
transactionManager := transactions.NewManager(conf)
- clientHandshaker := backchannel.NewClientHandshaker(logger, praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager)))
+ sidechannelRegistry := sidechannel.NewRegistry()
+ clientHandshaker := backchannel.NewClientHandshaker(logger, praefect.NewBackchannelServerFactory(logger, transaction.NewServer(transactionManager), sidechannelRegistry))
assignmentStore := praefect.NewDisabledAssignmentStore(conf.StorageNames())
var (
nodeManager nodes.Manager
@@ -287,7 +289,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
primaryGetter praefect.PrimaryGetter
)
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
- nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker)
+ nodeSet, err = praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry)
if err != nil {
return fmt.Errorf("dial nodes: %w", err)
}
@@ -326,7 +328,7 @@ func run(cfgs []starter.Config, conf config.Config) error {
"Deprecated election stategy in use, migrate to repository specific primary nodes following https://docs.gitlab.com/ee/administration/gitaly/praefect.html#migrate-to-repository-specific-primary-gitaly-nodes. The other election strategies are scheduled for removal in GitLab 14.0.")
}
- nodeMgr, err := nodes.NewManager(logger, conf, db, csg, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker)
+ nodeMgr, err := nodes.NewManager(logger, conf, db, csg, nodeLatencyHistogram, protoregistry.GitalyProtoPreregistered, errTracker, clientHandshaker, sidechannelRegistry)
if err != nil {
return err
}
diff --git a/cmd/praefect/subcmd_track_repository_test.go b/cmd/praefect/subcmd_track_repository_test.go
index 9c6c5e2d0..bdf29ce68 100644
--- a/cmd/praefect/subcmd_track_repository_test.go
+++ b/cmd/praefect/subcmd_track_repository_test.go
@@ -151,7 +151,7 @@ func TestAddRepository_Exec(t *testing.T) {
addCmdConf.Failover = tc.failoverConfig
t.Run("ok", func(t *testing.T) {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), addCmdConf, db.DB, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
diff --git a/internal/praefect/auth_test.go b/internal/praefect/auth_test.go
index 8bd4da73e..bafa7bfda 100644
--- a/internal/praefect/auth_test.go
+++ b/internal/praefect/auth_test.go
@@ -151,7 +151,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
logEntry := testhelper.DiscardTestEntry(t)
queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
txMgr := transactions.NewManager(conf)
@@ -161,7 +161,7 @@ func runServer(t *testing.T, token string, required bool) (*grpc.Server, string,
coordinator := NewCoordinator(queue, nil, NewNodeManagerRouter(nodeMgr, nil), txMgr, conf, registry)
- srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil, nil, nil)
+ srv := NewGRPCServer(conf, logEntry, registry, coordinator.StreamDirector, nodeMgr, txMgr, queue, nil, nil, nil, nil, nil)
serverSocketPath := testhelper.GetTemporaryGitalySocketFileName(t)
diff --git a/internal/praefect/coordinator_pg_test.go b/internal/praefect/coordinator_pg_test.go
index 2b5de0b10..14c281229 100644
--- a/internal/praefect/coordinator_pg_test.go
+++ b/internal/praefect/coordinator_pg_test.go
@@ -221,6 +221,7 @@ func TestStreamDirectorMutator_Transaction(t *testing.T) {
protoregistry.GitalyProtoPreregistered,
nil,
nil,
+ nil,
)
require.NoError(t, err)
defer nodeSet.Close()
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 9df1dcf76..adf4c6262 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -169,7 +169,7 @@ func TestStreamDirectorMutator(t *testing.T) {
txMgr := transactions.NewManager(conf)
- nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
defer nodeSet.Close()
@@ -308,7 +308,7 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
RelativePath: "/path/to/hashed/storage",
}
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -439,7 +439,7 @@ func TestStreamDirectorAccessor(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
rs := datastore.MockRepositoryStore{}
- nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -550,7 +550,7 @@ func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
},
}
- nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Minute)
@@ -872,7 +872,7 @@ func TestStreamDirector_repo_creation(t *testing.T) {
healthySecondaryNode.Address = "unix://" + gitalySocket1
unhealthySecondaryNode.Address = "unix://" + gitalySocket2
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -1074,7 +1074,7 @@ func TestAbsentCorrelationID(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -1208,7 +1208,7 @@ func TestStreamDirectorStorageScope(t *testing.T) {
rs := datastore.MockRepositoryStore{}
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
coordinator := NewCoordinator(
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 70d320eaa..96e12c32b 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -126,7 +126,7 @@ func defaultTxMgr(conf config.Config) *transactions.Manager {
}
func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
return nodeMgr
@@ -195,6 +195,7 @@ func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, op
opt.withAssignmentStore,
opt.withConnections,
opt.withPrimaryGetter,
+ nil,
)
listener, port := listenAvailPort(t)
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index e289040ad..7d1710ffd 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -60,7 +60,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- nodeManager, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeManager, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeManager.Start(0, time.Hour)
cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
diff --git a/internal/praefect/node.go b/internal/praefect/node.go
index 0002ff440..13539b691 100644
--- a/internal/praefect/node.go
+++ b/internal/praefect/node.go
@@ -9,6 +9,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes/tracker"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)
@@ -96,12 +97,13 @@ func DialNodes(
registry *protoregistry.Registry,
errorTracker tracker.ErrorTracker,
handshaker client.Handshaker,
+ sidechannelRegistry *sidechannel.Registry,
) (NodeSet, error) {
set := make(NodeSet, len(virtualStorages))
for _, virtualStorage := range virtualStorages {
set[virtualStorage.Name] = make(map[string]Node, len(virtualStorage.Nodes))
for _, node := range virtualStorage.Nodes {
- conn, err := nodes.Dial(ctx, node, registry, errorTracker, handshaker)
+ conn, err := nodes.Dial(ctx, node, registry, errorTracker, handshaker, sidechannelRegistry)
if err != nil {
return nil, fmt.Errorf("dial %q/%q: %w", virtualStorage.Name, node.Storage, err)
}
diff --git a/internal/praefect/node_test.go b/internal/praefect/node_test.go
index 8c6647530..9ee58274c 100644
--- a/internal/praefect/node_test.go
+++ b/internal/praefect/node_test.go
@@ -74,7 +74,7 @@ func TestDialNodes(t *testing.T) {
Storage: "invalid",
Address: "unix:non-existent-socket",
}),
- }}, nil, nil, nil,
+ }}, nil, nil, nil, nil,
)
require.NoError(t, err)
defer nodeSet.Close()
diff --git a/internal/praefect/nodes/manager.go b/internal/praefect/nodes/manager.go
index 61d4a52dc..2bfd9fb54 100644
--- a/internal/praefect/nodes/manager.go
+++ b/internal/praefect/nodes/manager.go
@@ -22,6 +22,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes/tracker"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
prommetrics "gitlab.com/gitlab-org/gitaly/v14/internal/prometheus/metrics"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
"google.golang.org/grpc"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -117,20 +118,26 @@ var ErrPrimaryNotHealthy = errors.New("primary gitaly is not healthy")
const dialTimeout = 10 * time.Second
// Dial dials a node with the necessary interceptors configured.
-func Dial(ctx context.Context, node *config.Node, registry *protoregistry.Registry, errorTracker tracker.ErrorTracker, handshaker client.Handshaker) (*grpc.ClientConn, error) {
+func Dial(ctx context.Context, node *config.Node, registry *protoregistry.Registry, errorTracker tracker.ErrorTracker, handshaker client.Handshaker, sidechannelRegistry *sidechannel.Registry) (*grpc.ClientConn, error) {
streamInterceptors := []grpc.StreamClientInterceptor{
grpcprometheus.StreamClientInterceptor,
+ sidechannel.NewStreamProxy(sidechannelRegistry),
}
if errorTracker != nil {
streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage))
}
+ unaryInterceptors := []grpc.UnaryClientInterceptor{
+ grpcprometheus.UnaryClientInterceptor,
+ sidechannel.NewUnaryProxy(sidechannelRegistry),
+ }
+
dialOpts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)),
grpc.WithChainStreamInterceptor(streamInterceptors...),
- grpc.WithChainUnaryInterceptor(grpcprometheus.UnaryClientInterceptor),
+ grpc.WithChainUnaryInterceptor(unaryInterceptors...),
}
return client.Dial(ctx, node.Address, dialOpts, handshaker)
@@ -146,6 +153,7 @@ func NewManager(
registry *protoregistry.Registry,
errorTracker tracker.ErrorTracker,
handshaker client.Handshaker,
+ sidechannelRegistry *sidechannel.Registry,
) (*Mgr, error) {
if !c.Failover.Enabled {
errorTracker = nil
@@ -161,7 +169,7 @@ func NewManager(
ns := make([]*nodeStatus, 0, len(virtualStorage.Nodes))
for _, node := range virtualStorage.Nodes {
- conn, err := Dial(ctx, node, registry, errorTracker, handshaker)
+ conn, err := Dial(ctx, node, registry, errorTracker, handshaker, sidechannelRegistry)
if err != nil {
return nil, err
}
diff --git a/internal/praefect/nodes/manager_test.go b/internal/praefect/nodes/manager_test.go
index 107e27638..51e064ff5 100644
--- a/internal/praefect/nodes/manager_test.go
+++ b/internal/praefect/nodes/manager_test.go
@@ -118,7 +118,7 @@ func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
Failover: config.Failover{Enabled: false, ElectionStrategy: config.ElectionStrategySQL},
VirtualStorages: []*config.VirtualStorage{virtualStorage},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nm.Start(time.Millisecond, time.Millisecond)
@@ -166,7 +166,7 @@ func TestDialWithUnhealthyNode(t *testing.T) {
testhelper.NewHealthServerWithListener(t, primaryLn)
- mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
mgr.Start(1*time.Millisecond, 1*time.Millisecond)
@@ -214,10 +214,10 @@ func TestNodeManager(t *testing.T) {
}
mockHistogram := promtest.NewMockHistogramVec()
- nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
- nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil)
+ nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
// monitoring period set to 1 hour as we execute health checks by hands in this test
@@ -332,7 +332,7 @@ func TestMgr_GetSyncedNode(t *testing.T) {
},
}
- nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
for i := range healthSrvs {
diff --git a/internal/praefect/nodes/sql_elector_test.go b/internal/praefect/nodes/sql_elector_test.go
index b41ab5e31..f88369aa9 100644
--- a/internal/praefect/nodes/sql_elector_test.go
+++ b/internal/praefect/nodes/sql_elector_test.go
@@ -480,6 +480,7 @@ func TestConnectionMultiplexing(t *testing.T) {
protoregistry.GitalyProtoPreregistered,
nil,
backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer() }),
+ nil,
)
require.NoError(t, err)
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 043838b49..bf6abf37d 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -124,7 +124,7 @@ func TestReplMgr_ProcessBacklog(t *testing.T) {
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond)
@@ -328,7 +328,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
})
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -347,7 +347,7 @@ func TestReplicator_PropagateReplicationJob(t *testing.T) {
replmgr := NewReplMgr(logEntry, conf.StorageNames(), queue, rs, nodeMgr, NodeSetFromNodeManager(nodeMgr))
- prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil)
+ prf := NewGRPCServer(conf, logEntry, protoregistry.GitalyProtoPreregistered, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, nil, nil, nil, nil)
listener, port := listenAvailPort(t)
ctx, cancel := testhelper.Context()
@@ -707,7 +707,7 @@ func TestProcessBacklog_FailedJobs(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -857,7 +857,7 @@ func TestProcessBacklog_Success(t *testing.T) {
logEntry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(logEntry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
diff --git a/internal/praefect/repocleaner/repository_test.go b/internal/praefect/repocleaner/repository_test.go
index 32e7d7ad5..c7c04ce5a 100644
--- a/internal/praefect/repocleaner/repository_test.go
+++ b/internal/praefect/repocleaner/repository_test.go
@@ -120,8 +120,8 @@ func TestRunner_Run(t *testing.T) {
logger.SetLevel(logrus.DebugLevel)
entry := logger.WithContext(ctx)
- clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
- nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
@@ -242,8 +242,8 @@ func TestRunner_Run_noAvailableStorages(t *testing.T) {
logger := testhelper.NewTestLogger(t)
entry := logger.WithContext(ctx)
- clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil)))
- nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker)
+ clientHandshaker := backchannel.NewClientHandshaker(entry, praefect.NewBackchannelServerFactory(entry, transaction.NewServer(nil), nil))
+ nodeSet, err := praefect.DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, nil)
require.NoError(t, err)
defer nodeSet.Close()
diff --git a/internal/praefect/repository_exists_test.go b/internal/praefect/repository_exists_test.go
index 45e315299..cf66210f2 100644
--- a/internal/praefect/repository_exists_test.go
+++ b/internal/praefect/repository_exists_test.go
@@ -100,6 +100,7 @@ func TestRepositoryExistsHandler(t *testing.T) {
nil,
nil,
nil,
+ nil,
)
defer srv.Stop()
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 7e9570a2e..cb74b14d5 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -13,6 +13,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/server/auth"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/fieldextractors"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
"gitlab.com/gitlab-org/gitaly/v14/internal/log"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/cancelhandler"
"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
@@ -29,10 +30,13 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/server"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
"google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive"
@@ -40,14 +44,17 @@ import (
// NewBackchannelServerFactory returns a ServerFactory that serves the RefTransactionServer on the backchannel
// connection.
-func NewBackchannelServerFactory(logger *logrus.Entry, svc gitalypb.RefTransactionServer) backchannel.ServerFactory {
+func NewBackchannelServerFactory(logger *logrus.Entry, refSvc gitalypb.RefTransactionServer, registry *sidechannel.Registry) backchannel.ServerFactory {
return func() backchannel.Server {
+ lm := listenmux.New(insecure.NewCredentials())
+ lm.Register(sidechannel.NewServerHandshaker(registry))
srv := grpc.NewServer(
grpc.UnaryInterceptor(grpcmw.ChainUnaryServer(
commonUnaryServerInterceptors(logger)...,
)),
+ grpc.Creds(lm),
)
- gitalypb.RegisterRefTransactionServer(srv, svc)
+ gitalypb.RegisterRefTransactionServer(srv, refSvc)
grpcprometheus.Register(srv)
return srv
}
@@ -87,6 +94,7 @@ func NewGRPCServer(
assignmentStore AssignmentStore,
conns Connections,
primaryGetter PrimaryGetter,
+ creds credentials.TransportCredentials,
grpcOpts ...grpc.ServerOption,
) *grpc.Server {
streamInterceptors := []grpc.StreamServerInterceptor{
@@ -122,6 +130,15 @@ func NewGRPCServer(
}),
}...)
+ // Accept backchannel connections so that we can proxy sidechannels
+ // from clients (e.g. Workhorse) to a backend Gitaly server.
+ if creds == nil {
+ creds = insecure.NewCredentials()
+ }
+ lm := listenmux.New(creds)
+ lm.Register(backchannel.NewServerHandshaker(logger, backchannel.NewRegistry(), nil))
+ grpcOpts = append(grpcOpts, grpc.Creds(lm))
+
warnDupeAddrs(logger, conf)
srv := grpc.NewServer(grpcOpts...)
diff --git a/internal/praefect/server_factory.go b/internal/praefect/server_factory.go
index e27661b0e..3b37d0559 100644
--- a/internal/praefect/server_factory.go
+++ b/internal/praefect/server_factory.go
@@ -102,7 +102,7 @@ func (s *ServerFactory) Create(secure bool) (*grpc.Server, error) {
defer s.mtx.Unlock()
if !secure {
- s.insecure = append(s.insecure, s.createGRPC())
+ s.insecure = append(s.insecure, s.createGRPC(nil))
return s.insecure[len(s.insecure)-1], nil
}
@@ -111,15 +111,15 @@ func (s *ServerFactory) Create(secure bool) (*grpc.Server, error) {
return nil, fmt.Errorf("load certificate key pair: %w", err)
}
- s.secure = append(s.secure, s.createGRPC(grpc.Creds(credentials.NewTLS(&tls.Config{
+ s.secure = append(s.secure, s.createGRPC(credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
- }))))
+ })))
return s.secure[len(s.secure)-1], nil
}
-func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server {
+func (s *ServerFactory) createGRPC(creds credentials.TransportCredentials) *grpc.Server {
return NewGRPCServer(
s.conf,
s.logger,
@@ -132,7 +132,7 @@ func (s *ServerFactory) createGRPC(grpcOpts ...grpc.ServerOption) *grpc.Server {
s.assignmentStore,
s.conns,
s.primaryGetter,
- grpcOpts...,
+ creds,
)
}
diff --git a/internal/praefect/server_factory_test.go b/internal/praefect/server_factory_test.go
index 53bbc825b..a6aea62a8 100644
--- a/internal/praefect/server_factory_test.go
+++ b/internal/praefect/server_factory_test.go
@@ -1,9 +1,12 @@
package praefect
import (
+ "bytes"
"context"
"crypto/tls"
"crypto/x509"
+ "fmt"
+ "io"
"net"
"os"
"testing"
@@ -11,17 +14,21 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/bootstrap/starter"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
gconfig "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/setup"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper/text"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/listenmux"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/service/transaction"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/sidechannel"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
@@ -29,6 +36,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/credentials/insecure"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
@@ -66,10 +74,12 @@ func TestServerFactory(t *testing.T) {
queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
rs := datastore.MockRepositoryStore{}
- nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil, nil)
+ txMgr := transactions.NewManager(conf)
+ sidechannelRegistry := sidechannel.NewRegistry()
+ clientHandshaker := backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(logger, transaction.NewServer(txMgr), sidechannelRegistry))
+ nodeMgr, err := nodes.NewManager(logger, conf, nil, rs, &promtest.MockHistogramVec{}, protoregistry.GitalyProtoPreregistered, nil, clientHandshaker, sidechannelRegistry)
require.NoError(t, err)
nodeMgr.Start(0, time.Second)
- txMgr := transactions.NewManager(conf)
registry := protoregistry.GitalyProtoPreregistered
coordinator := NewCoordinator(
@@ -103,6 +113,66 @@ func TestServerFactory(t *testing.T) {
require.Equal(t, revision, resp.Commit.Id)
}
+ checkSidechannelGitaly := func(ctx context.Context, t *testing.T, addr string, creds credentials.TransportCredentials) {
+ t.Helper()
+
+ // Client has its own sidechannel registry, don't reuse the one we plugged into Praefect.
+ registry := sidechannel.NewRegistry()
+
+ factory := func() backchannel.Server {
+ lm := listenmux.New(insecure.NewCredentials())
+ lm.Register(sidechannel.NewServerHandshaker(registry))
+ return grpc.NewServer(grpc.Creds(lm))
+ }
+
+ clientHandshaker := backchannel.NewClientHandshaker(logger, factory)
+ dialOpt := grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(creds))
+
+ cc, err := grpc.Dial(addr, dialOpt)
+ require.NoError(t, err)
+ defer func() { require.NoError(t, cc.Close()) }()
+
+ var pack []byte
+ ctx, waiter := sidechannel.RegisterSidechannel(ctx, registry, func(conn *sidechannel.ClientConn) error {
+ // 1e292f8fedd741b75372e19097c76d327140c312 is refs/heads/master of the test repo
+ const message = `003cwant 1e292f8fedd741b75372e19097c76d327140c312 ofs-delta
+00000009done
+`
+
+ if _, err := io.WriteString(conn, message); err != nil {
+ return err
+ }
+ if err := conn.CloseWrite(); err != nil {
+ return err
+ }
+
+ buf := make([]byte, 8)
+ if _, err := io.ReadFull(conn, buf); err != nil {
+ return fmt.Errorf("read nak: %w", err)
+ }
+ if string(buf) != "0008NAK\n" {
+ return fmt.Errorf("unexpected response: %q", buf)
+ }
+
+ var err error
+ pack, err = io.ReadAll(conn)
+ if err != nil {
+ return err
+ }
+
+ return nil
+ })
+ defer waiter.Close()
+
+ _, err = gitalypb.NewSmartHTTPServiceClient(cc).PostUploadPackWithSidechannel(ctx,
+ &gitalypb.PostUploadPackWithSidechannelRequest{Repository: repo},
+ )
+ require.NoError(t, err)
+ require.NoError(t, waiter.Close())
+
+ gittest.ExecStream(t, cfg, bytes.NewReader(pack), "-C", repoPath, "index-pack", "--stdin", "--fix-thin")
+ }
+
t.Run("insecure", func(t *testing.T) {
praefectServerFactory := NewServerFactory(conf, logger, coordinator.StreamDirector, nodeMgr, txMgr, queue, rs, datastore.AssignmentStore{}, registry, nil, nil)
defer praefectServerFactory.Stop()
@@ -116,6 +186,8 @@ func TestServerFactory(t *testing.T) {
praefectAddr, err := starter.ComposeEndpoint(listener.Addr().Network(), listener.Addr().String())
require.NoError(t, err)
+ creds := insecure.NewCredentials()
+
cc, err := client.Dial(praefectAddr, nil)
require.NoError(t, err)
defer func() { require.NoError(t, cc.Close()) }()
@@ -130,6 +202,10 @@ func TestServerFactory(t *testing.T) {
t.Run("proxies RPCs onto gitaly server", func(t *testing.T) {
checkProxyingOntoGitaly(ctx, t, cc)
})
+
+ t.Run("proxies sidechannel RPCs onto gitaly server", func(t *testing.T) {
+ checkSidechannelGitaly(ctx, t, listener.Addr().String(), creds)
+ })
})
t.Run("secure", func(t *testing.T) {
@@ -168,6 +244,10 @@ func TestServerFactory(t *testing.T) {
t.Run("proxies RPCs onto gitaly server", func(t *testing.T) {
checkProxyingOntoGitaly(ctx, t, cc)
})
+
+ t.Run("proxies sidechannel RPCs onto gitaly server", func(t *testing.T) {
+ checkSidechannelGitaly(ctx, t, listener.Addr().String(), creds)
+ })
})
t.Run("stops all listening servers", func(t *testing.T) {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index e824cb600..163c13d4f 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -103,8 +103,8 @@ func TestNewBackchannelServerFactory(t *testing.T) {
Name: "default",
Nodes: []*config.Node{{Storage: "gitaly-1", Address: "tcp://" + ln.Addr().String()}},
}}, nil, nil, backchannel.NewClientHandshaker(logger, NewBackchannelServerFactory(
- testhelper.DiscardTestEntry(t), transaction.NewServer(mgr),
- )))
+ testhelper.DiscardTestEntry(t), transaction.NewServer(mgr), nil,
+ )), nil)
require.NoError(t, err)
defer nodeSet.Close()
@@ -158,7 +158,7 @@ func TestGitalyServerInfo(t *testing.T) {
},
}
- nodeSet, err := DialNodes(ctx, conf.VirtualStorages, nil, nil, nil)
+ nodeSet, err := DialNodes(ctx, conf.VirtualStorages, nil, nil, nil, nil)
require.NoError(t, err)
t.Cleanup(nodeSet.Close)
@@ -224,7 +224,7 @@ func TestGitalyServerInfo(t *testing.T) {
},
}
- nodeSet, err := DialNodes(ctx, conf.VirtualStorages, nil, nil, nil)
+ nodeSet, err := DialNodes(ctx, conf.VirtualStorages, nil, nil, nil, nil)
require.NoError(t, err)
t.Cleanup(nodeSet.Close)
@@ -263,7 +263,7 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- nodes, err := DialNodes(ctx, conf.VirtualStorages, nil, nil, nil)
+ nodes, err := DialNodes(ctx, conf.VirtualStorages, nil, nil, nil, nil)
require.NoError(t, err)
defer nodes.Close()
@@ -297,7 +297,7 @@ func TestDiskStatistics(t *testing.T) {
ctx, cancel := testhelper.Context()
defer cancel()
- nodes, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil)
+ nodes, err := DialNodes(ctx, praefectCfg.VirtualStorages, nil, nil, nil, nil)
require.NoError(t, err)
defer nodes.Close()
@@ -541,8 +541,8 @@ func TestRemoveRepository(t *testing.T) {
repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered,
nil, backchannel.NewClientHandshaker(
testhelper.DiscardTestEntry(t),
- NewBackchannelServerFactory(testhelper.DiscardTestEntry(t), transaction.NewServer(txMgr)),
- ),
+ NewBackchannelServerFactory(testhelper.DiscardTestEntry(t), transaction.NewServer(txMgr), nil),
+ ), nil,
)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -809,7 +809,7 @@ func TestProxyWrites(t *testing.T) {
queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
entry := testhelper.DiscardTestEntry(t)
- nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
require.NoError(t, err)
nodeMgr.Start(0, time.Hour)
@@ -976,7 +976,7 @@ func TestErrorThreshold(t *testing.T) {
require.NoError(t, err)
rs := datastore.MockRepositoryStore{}
- nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), registry, errorTracker, nil)
+ nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), registry, errorTracker, nil, nil)
require.NoError(t, err)
coordinator := NewCoordinator(