Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavlo Strokov <pstrokov@gitlab.com>2022-07-29 16:33:43 +0300
committerPavlo Strokov <pstrokov@gitlab.com>2022-08-08 14:58:04 +0300
commit8d4ab55cd459d43cd2c536d5db7d78813a5fe51e (patch)
treeeccf67c0043877f9b638acfbc2a08cebc945f4cc
parentf6459afe33922aa6e73e0334541ebd844e450742 (diff)
praefect: Make runPraefectServer exportable
To test ReadinessCheck RPC we need to run praefect service. The runPraefectServer function does what we need, but it is not exportable. This change moves and renames runPraefectServer and related code, so it can be re-used in other packages for testing.
-rw-r--r--internal/praefect/coordinator_test.go18
-rw-r--r--internal/praefect/helper_test.go247
-rw-r--r--internal/praefect/info_service_test.go12
-rw-r--r--internal/praefect/server_test.go60
-rw-r--r--internal/praefect/testserver.go280
-rw-r--r--internal/praefect/transaction_test.go6
-rw-r--r--internal/praefect/verifier_test.go8
7 files changed, 332 insertions, 299 deletions
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 19486f40c..37e495ad5 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -568,7 +568,7 @@ func TestStreamDirector_maintenanceRPCs(t *testing.T) {
t, testcfg.WithStorages(secondaryStorage)),
)
- cc, _, cleanup := runPraefectServer(t, ctx, config.Config{
+ cc, _, cleanup := RunPraefectServer(t, ctx, config.Config{
VirtualStorages: []*config.VirtualStorage{
{
Name: "default",
@@ -584,7 +584,7 @@ func TestStreamDirector_maintenanceRPCs(t *testing.T) {
},
},
},
- }, buildOptions{})
+ }, BuildOptions{})
defer cleanup()
repository := &gitalypb.Repository{
@@ -1569,10 +1569,10 @@ func TestCoordinatorEnqueueFailure(t *testing.T) {
require.NoError(t, err)
ctx := testhelper.Context(t)
- cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withAnnotations: r,
- withQueue: queueInterceptor,
- withBackends: withMockBackends(t, map[string]mock.SimpleServiceServer{
+ cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithAnnotations: r,
+ WithQueue: queueInterceptor,
+ WithBackends: WithMockBackends(t, map[string]mock.SimpleServiceServer{
conf.VirtualStorages[0].Nodes[0].Storage: ms,
conf.VirtualStorages[0].Nodes[1].Storage: ms,
}),
@@ -1946,10 +1946,10 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
})
}
- praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{
+ praefectConn, _, cleanup := RunPraefectServer(t, ctx, praefectConfig, BuildOptions{
// Set up a mock manager which sets up primary/secondaries and pretends that all nodes are
// healthy. We need fixed roles and unhealthy nodes will not take part in transactions.
- withNodeMgr: &nodes.MockManager{
+ WithNodeMgr: &nodes.MockManager{
Storage: testhelper.DefaultStorageName,
GetShardFunc: func(shardName string) (nodes.Shard, error) {
require.Equal(t, testhelper.DefaultStorageName, shardName)
@@ -1964,7 +1964,7 @@ func TestCoordinator_grpcErrorHandling(t *testing.T) {
},
// Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent
// nodes will take part in transactions.
- withRepoStore: datastore.MockRepositoryStore{
+ WithRepoStore: datastore.MockRepositoryStore{
GetReplicaPathFunc: func(ctx context.Context, repositoryID int64) (string, error) {
return repoProto.GetRelativePath(), nil
},
diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go
index 8b654b97a..a8d98da5c 100644
--- a/internal/praefect/helper_test.go
+++ b/internal/praefect/helper_test.go
@@ -5,30 +5,9 @@ package praefect
import (
"context"
"fmt"
- "net"
- "testing"
- "time"
- "github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/v15/client"
- gitalycfgauth "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth"
- "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/server/auth"
- "gitlab.com/gitlab-org/gitaly/v15/internal/log"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
- "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/promtest"
- "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb"
- correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
- "google.golang.org/grpc"
- "google.golang.org/grpc/health"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// generates a praefect configuration with the specified number of backend
@@ -56,14 +35,6 @@ func testConfig(backends int) config.Config {
return cfg
}
-type noopBackoffFactory struct{}
-
-func (noopBackoffFactory) Create() (Backoff, BackoffReset) {
- return func() time.Duration {
- return 0
- }, func() {}
-}
-
type nullNodeMgr struct{}
func (nullNodeMgr) GetShard(ctx context.Context, virtualStorageName string) (nodes.Shard, error) {
@@ -81,221 +52,3 @@ func (nullNodeMgr) HealthyNodes() map[string][]string {
func (nullNodeMgr) Nodes() map[string][]nodes.Node {
return nil
}
-
-type buildOptions struct {
- withQueue datastore.ReplicationEventQueue
- withTxMgr *transactions.Manager
- withBackends func([]*config.VirtualStorage) []testhelper.Cleanup
- withAnnotations *protoregistry.Registry
- withLogger *logrus.Entry
- withNodeMgr nodes.Manager
- withRepoStore datastore.RepositoryStore
- withAssignmentStore AssignmentStore
- withConnections Connections
- withPrimaryGetter PrimaryGetter
- withRouter Router
-}
-
-func withMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer) func([]*config.VirtualStorage) []testhelper.Cleanup {
- return func(virtualStorages []*config.VirtualStorage) []testhelper.Cleanup {
- var cleanups []testhelper.Cleanup
-
- for _, vs := range virtualStorages {
- require.Equal(t, len(backends), len(vs.Nodes),
- "mock server count doesn't match config nodes")
-
- for i, node := range vs.Nodes {
- backend, ok := backends[node.Storage]
- require.True(t, ok, "missing backend server for node %s", node.Storage)
-
- backendAddr, cleanup := newMockDownstream(t, node.Token, backend)
- cleanups = append(cleanups, cleanup)
-
- node.Address = backendAddr
- vs.Nodes[i] = node
- }
- }
-
- return cleanups
- }
-}
-
-func defaultQueue(t testing.TB) datastore.ReplicationEventQueue {
- return datastore.NewPostgresReplicationEventQueue(testdb.New(t))
-}
-
-func defaultTxMgr(conf config.Config) *transactions.Manager {
- return transactions.NewManager(conf)
-}
-
-func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager {
- nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
- require.NoError(t, err)
- nodeMgr.Start(0, time.Hour)
- t.Cleanup(nodeMgr.Stop)
- return nodeMgr
-}
-
-func defaultRepoStore(conf config.Config) datastore.RepositoryStore {
- return datastore.MockRepositoryStore{}
-}
-
-func runPraefectServer(t testing.TB, ctx context.Context, conf config.Config, opt buildOptions) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
- var cleanups []testhelper.Cleanup
-
- if opt.withQueue == nil {
- opt.withQueue = defaultQueue(t)
- }
- if opt.withRepoStore == nil {
- opt.withRepoStore = defaultRepoStore(conf)
- }
- if opt.withTxMgr == nil {
- opt.withTxMgr = defaultTxMgr(conf)
- }
- if opt.withBackends != nil {
- cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...)
- }
- if opt.withAnnotations == nil {
- opt.withAnnotations = protoregistry.GitalyProtoPreregistered
- }
- if opt.withLogger == nil {
- opt.withLogger = log.Default()
- }
- if opt.withNodeMgr == nil {
- opt.withNodeMgr = defaultNodeMgr(t, conf, opt.withRepoStore)
- }
- if opt.withAssignmentStore == nil {
- opt.withAssignmentStore = NewDisabledAssignmentStore(conf.StorageNames())
- }
- if opt.withRouter == nil {
- opt.withRouter = NewNodeManagerRouter(opt.withNodeMgr, opt.withRepoStore)
- }
-
- coordinator := NewCoordinator(
- opt.withQueue,
- opt.withRepoStore,
- opt.withRouter,
- opt.withTxMgr,
- conf,
- opt.withAnnotations,
- )
-
- // TODO: run a replmgr for EVERY virtual storage
- replmgr := NewReplMgr(
- opt.withLogger,
- conf.StorageNames(),
- opt.withQueue,
- opt.withRepoStore,
- opt.withNodeMgr,
- NodeSetFromNodeManager(opt.withNodeMgr),
- )
-
- prf := NewGRPCServer(
- conf,
- opt.withLogger,
- protoregistry.GitalyProtoPreregistered,
- coordinator.StreamDirector,
- opt.withTxMgr,
- opt.withRepoStore,
- opt.withAssignmentStore,
- opt.withConnections,
- opt.withPrimaryGetter,
- nil,
- )
-
- listener, port := listenAvailPort(t)
-
- errQ := make(chan error)
- ctx, cancel := context.WithCancel(ctx)
-
- go func() {
- errQ <- prf.Serve(listener)
- close(errQ)
- }()
- replMgrDone := startProcessBacklog(ctx, replmgr)
-
- // dial client to praefect
- cc := dialLocalPort(t, port, false)
-
- cleanup := func() {
- cc.Close()
-
- for _, cu := range cleanups {
- cu()
- }
-
- prf.Stop()
-
- cancel()
- <-replMgrDone
- require.NoError(t, <-errQ)
- }
-
- return cc, prf, cleanup
-}
-
-func listenAvailPort(tb testing.TB) (net.Listener, int) {
- listener, err := net.Listen("tcp", "localhost:0")
- require.NoError(tb, err)
-
- return listener, listener.Addr().(*net.TCPAddr).Port
-}
-
-func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
- opts := []grpc.DialOption{
- grpc.WithBlock(),
- grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()),
- grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()),
- }
- if backend {
- opts = append(
- opts,
- grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
- )
- }
-
- cc, err := client.Dial(
- fmt.Sprintf("tcp://localhost:%d", port),
- opts,
- )
- require.NoError(tb, err)
-
- return cc
-}
-
-func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) {
- srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(gitalycfgauth.Config{Token: token})))
- mock.RegisterSimpleServiceServer(srv, m)
- healthpb.RegisterHealthServer(srv, health.NewServer())
-
- // client to backend service
- lis, port := listenAvailPort(tb)
-
- errQ := make(chan error)
-
- go func() {
- errQ <- srv.Serve(lis)
- }()
-
- cleanup := func() {
- srv.GracefulStop()
- lis.Close()
-
- // If the server is shutdown before Serve() is called on it
- // the Serve() calls will return the ErrServerStopped
- if err := <-errQ; err != nil && err != grpc.ErrServerStopped {
- require.NoError(tb, err)
- }
- }
-
- return fmt.Sprintf("tcp://localhost:%d", port), cleanup
-}
-
-func startProcessBacklog(ctx context.Context, replMgr ReplMgr) <-chan struct{} {
- done := make(chan struct{})
- go func() {
- defer close(done)
- replMgr.ProcessBacklog(ctx, noopBackoffFactory{})
- }()
- return done
-}
diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go
index 6e860a3c6..3c9d6c211 100644
--- a/internal/praefect/info_service_test.go
+++ b/internal/praefect/info_service_test.go
@@ -100,10 +100,10 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
conns := nodeSet.Connections()
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withConnections: conns,
- withRepoStore: rs,
- withRouter: NewPerRepositoryRouter(
+ cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithConnections: conns,
+ WithRepoStore: rs,
+ WithRouter: NewPerRepositoryRouter(
conns,
elector,
StaticHealthChecker{virtualStorage: storages},
@@ -113,8 +113,8 @@ func TestInfoService_RepositoryReplicas(t *testing.T) {
rs,
conf.DefaultReplicationFactors(),
),
- withPrimaryGetter: elector,
- withTxMgr: txManager,
+ WithPrimaryGetter: elector,
+ WithTxMgr: txManager,
})
// use cleanup to close the connections as gittest.CreateRepository will still use the connection
// for clean up after the test.
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index efecddbab..edbed63e0 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -159,8 +159,8 @@ func TestGitalyServerInfo(t *testing.T) {
require.NoError(t, err)
t.Cleanup(nodeSet.Close)
- cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withConnections: nodeSet.Connections(),
+ cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithConnections: nodeSet.Connections(),
})
t.Cleanup(cleanup)
@@ -225,8 +225,8 @@ func TestGitalyServerInfo(t *testing.T) {
require.NoError(t, err)
t.Cleanup(nodeSet.Close)
- cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withConnections: nodeSet.Connections(),
+ cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithConnections: nodeSet.Connections(),
})
t.Cleanup(cleanup)
@@ -262,8 +262,8 @@ func TestGitalyServerInfoBadNode(t *testing.T) {
require.NoError(t, err)
defer nodes.Close()
- cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withConnections: nodes.Connections(),
+ cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithConnections: nodes.Connections(),
})
defer cleanup()
@@ -294,8 +294,8 @@ func TestDiskStatistics(t *testing.T) {
require.NoError(t, err)
defer nodes.Close()
- cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
- withConnections: nodes.Connections(),
+ cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{
+ WithConnections: nodes.Connections(),
})
defer cleanup()
@@ -314,12 +314,12 @@ func TestHealthCheck(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
- cc, _, cleanup := runPraefectServer(t, ctx, config.Config{VirtualStorages: []*config.VirtualStorage{
+ cc, _, cleanup := RunPraefectServer(t, ctx, config.Config{VirtualStorages: []*config.VirtualStorage{
{
Name: "praefect",
Nodes: []*config.Node{{Storage: "stub", Address: "unix:///stub-address", Token: ""}},
},
- }}, buildOptions{})
+ }}, BuildOptions{})
defer cleanup()
client := grpc_health_v1.NewHealthClient(cc)
@@ -344,7 +344,7 @@ func TestRejectBadStorage(t *testing.T) {
}
ctx := testhelper.Context(t)
- cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{})
+ cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{})
defer cleanup()
req := &gitalypb.GarbageCollectRequest{
@@ -396,9 +396,9 @@ func TestWarnDuplicateAddrs(t *testing.T) {
tLogger, hook := test.NewNullLogger()
// instantiate a praefect server and trigger warning
- _, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withLogger: logrus.NewEntry(tLogger),
- withNodeMgr: nullNodeMgr{}, // to suppress node address issues
+ _, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithLogger: logrus.NewEntry(tLogger),
+ WithNodeMgr: nullNodeMgr{}, // to suppress node address issues
})
defer cleanup()
@@ -427,9 +427,9 @@ func TestWarnDuplicateAddrs(t *testing.T) {
tLogger, hook = test.NewNullLogger()
// instantiate a praefect server and trigger warning
- _, _, cleanup = runPraefectServer(t, ctx, conf, buildOptions{
- withLogger: logrus.NewEntry(tLogger),
- withNodeMgr: nullNodeMgr{}, // to suppress node address issues
+ _, _, cleanup = RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithLogger: logrus.NewEntry(tLogger),
+ WithNodeMgr: nullNodeMgr{}, // to suppress node address issues
})
defer cleanup()
@@ -476,9 +476,9 @@ func TestWarnDuplicateAddrs(t *testing.T) {
tLogger, hook = test.NewNullLogger()
// instantiate a praefect server and trigger warning
- _, _, cleanup = runPraefectServer(t, ctx, conf, buildOptions{
- withLogger: logrus.NewEntry(tLogger),
- withNodeMgr: nullNodeMgr{}, // to suppress node address issues
+ _, _, cleanup = RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithLogger: logrus.NewEntry(tLogger),
+ WithNodeMgr: nullNodeMgr{}, // to suppress node address issues
})
defer cleanup()
@@ -535,15 +535,15 @@ func TestRemoveRepository(t *testing.T) {
nodeMgr.Start(0, time.Hour)
defer nodeMgr.Stop()
- cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
- withQueue: queueInterceptor,
- withRepoStore: datastore.MockRepositoryStore{
+ cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{
+ WithQueue: queueInterceptor,
+ WithRepoStore: datastore.MockRepositoryStore{
GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (string, map[string]struct{}, error) {
return relativePath, nil, nil
},
},
- withNodeMgr: nodeMgr,
- withTxMgr: txMgr,
+ WithNodeMgr: nodeMgr,
+ WithTxMgr: txMgr,
})
defer cleanup()
@@ -612,10 +612,10 @@ func testRenameRepository(t *testing.T, ctx context.Context) {
require.NoError(t, err)
defer nodeSet.Close()
- cc, _, cleanup := runPraefectServer(t, ctx, praefectCfg, buildOptions{
- withQueue: evq,
- withRepoStore: rs,
- withRouter: NewPerRepositoryRouter(
+ cc, _, cleanup := RunPraefectServer(t, ctx, praefectCfg, BuildOptions{
+ WithQueue: evq,
+ WithRepoStore: rs,
+ WithRouter: NewPerRepositoryRouter(
nodeSet.Connections(),
nodes.NewPerRepositoryElector(db),
StaticHealthChecker(praefectCfg.StorageNames()),
@@ -625,7 +625,7 @@ func testRenameRepository(t *testing.T, ctx context.Context) {
rs,
nil,
),
- withTxMgr: txManager,
+ WithTxMgr: txManager,
})
t.Cleanup(cleanup)
diff --git a/internal/praefect/testserver.go b/internal/praefect/testserver.go
new file mode 100644
index 000000000..eec0bf4b9
--- /dev/null
+++ b/internal/praefect/testserver.go
@@ -0,0 +1,280 @@
+package praefect
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/client"
+ gitalycfgauth "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/server/auth"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/mock"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/nodes"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/transactions"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/promtest"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb"
+ correlation "gitlab.com/gitlab-org/labkit/correlation/grpc"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+// BuildOptions is a set of configurations options that can be set to configure praefect service.
+type BuildOptions struct {
+ // WithQueue sets an implementation of the replication queue to use by praefect service.
+ WithQueue datastore.ReplicationEventQueue
+ // WithTxMgr sets the transaction manager to use by praefect service.
+ WithTxMgr *transactions.Manager
+ // WithBackends sets a callback that is triggered during initialization.
+ WithBackends func([]*config.VirtualStorage) []testhelper.Cleanup
+ // WithAnnotations sets a proto-registry to use by praefect service.
+ WithAnnotations *protoregistry.Registry
+ // WithLogger sets a logger to use by praefect service.
+ WithLogger *logrus.Entry
+ // WithNodeMgr sets an implementation of the node manager to use by praefect service.
+ WithNodeMgr nodes.Manager
+ // WithRepoStore sets an implementation of the repositories store to use by praefect service.
+ WithRepoStore datastore.RepositoryStore
+ // WithAssignmentStore sets an implementation of the repositories store to use by praefect service.
+ WithAssignmentStore AssignmentStore
+ // WithConnections sets a set of connections to gitalies.
+ WithConnections Connections
+ // WithPrimaryGetter sets an implementation of the primary node getter to use by praefect service.
+ WithPrimaryGetter PrimaryGetter
+ // WithRouter sets an implementation of the request router to use by praefect service.
+ WithRouter Router
+}
+
+// WithMockBackends mocks backends with a set of passed in stubs.
+func WithMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer) func([]*config.VirtualStorage) []testhelper.Cleanup {
+ return func(virtualStorages []*config.VirtualStorage) []testhelper.Cleanup {
+ var cleanups []testhelper.Cleanup
+
+ for _, vs := range virtualStorages {
+ require.Equal(t, len(backends), len(vs.Nodes),
+ "mock server count doesn't match config nodes")
+
+ for i, node := range vs.Nodes {
+ backend, ok := backends[node.Storage]
+ require.True(t, ok, "missing backend server for node %s", node.Storage)
+
+ backendAddr, cleanup := newMockDownstream(t, node.Token, backend)
+ cleanups = append(cleanups, cleanup)
+
+ node.Address = backendAddr
+ vs.Nodes[i] = node
+ }
+ }
+
+ return cleanups
+ }
+}
+
+func defaultQueue(t testing.TB) datastore.ReplicationEventQueue {
+ return datastore.NewPostgresReplicationEventQueue(testdb.New(t))
+}
+
+func defaultTxMgr(conf config.Config) *transactions.Manager {
+ return transactions.NewManager(conf)
+}
+
+func defaultNodeMgr(t testing.TB, conf config.Config, rs datastore.RepositoryStore) nodes.Manager {
+ nodeMgr, err := nodes.NewManager(testhelper.NewDiscardingLogEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
+ require.NoError(t, err)
+ nodeMgr.Start(0, time.Hour)
+ t.Cleanup(nodeMgr.Stop)
+ return nodeMgr
+}
+
+func defaultRepoStore(conf config.Config) datastore.RepositoryStore {
+ return datastore.MockRepositoryStore{}
+}
+
+func listenAvailPort(tb testing.TB) (net.Listener, int) {
+ listener, err := net.Listen("tcp", "localhost:0")
+ require.NoError(tb, err)
+
+ return listener, listener.Addr().(*net.TCPAddr).Port
+}
+
+func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
+ opts := []grpc.DialOption{
+ grpc.WithBlock(),
+ grpc.WithUnaryInterceptor(correlation.UnaryClientCorrelationInterceptor()),
+ grpc.WithStreamInterceptor(correlation.StreamClientCorrelationInterceptor()),
+ }
+ if backend {
+ opts = append(
+ opts,
+ grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
+ )
+ }
+
+ cc, err := client.Dial(
+ fmt.Sprintf("tcp://localhost:%d", port),
+ opts,
+ )
+ require.NoError(tb, err)
+
+ return cc
+}
+
+func newMockDownstream(tb testing.TB, token string, m mock.SimpleServiceServer) (string, func()) {
+ srv := grpc.NewServer(grpc.UnaryInterceptor(auth.UnaryServerInterceptor(gitalycfgauth.Config{Token: token})))
+ mock.RegisterSimpleServiceServer(srv, m)
+ healthpb.RegisterHealthServer(srv, health.NewServer())
+
+ // client to backend service
+ lis, port := listenAvailPort(tb)
+
+ errQ := make(chan error)
+
+ go func() {
+ errQ <- srv.Serve(lis)
+ }()
+
+ cleanup := func() {
+ srv.GracefulStop()
+ lis.Close()
+
+ // If the server is shutdown before Serve() is called on it
+ // the Serve() calls will return the ErrServerStopped
+ if err := <-errQ; err != nil && err != grpc.ErrServerStopped {
+ require.NoError(tb, err)
+ }
+ }
+
+ return fmt.Sprintf("tcp://localhost:%d", port), cleanup
+}
+
+type noopBackoffFactory struct{}
+
+func (noopBackoffFactory) Create() (Backoff, BackoffReset) {
+ return func() time.Duration {
+ return 0
+ }, func() {}
+}
+
+func startProcessBacklog(ctx context.Context, replMgr ReplMgr) <-chan struct{} {
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ replMgr.ProcessBacklog(ctx, noopBackoffFactory{})
+ }()
+ return done
+}
+
+// RunPraefectServer starts praefect service based on the passed in configuration and options.
+// The caller is responsible to call returned testhelper.Cleanup in order to stop the service
+// and release all acquired resources.
+// The function should be used only for testing purposes and not as part of the production code.
+//
+//nolint:revive
+func RunPraefectServer(
+ t testing.TB,
+ ctx context.Context,
+ conf config.Config,
+ opt BuildOptions,
+) (*grpc.ClientConn, *grpc.Server, testhelper.Cleanup) {
+ var cleanups []testhelper.Cleanup
+
+ if opt.WithQueue == nil {
+ opt.WithQueue = defaultQueue(t)
+ }
+ if opt.WithRepoStore == nil {
+ opt.WithRepoStore = defaultRepoStore(conf)
+ }
+ if opt.WithTxMgr == nil {
+ opt.WithTxMgr = defaultTxMgr(conf)
+ }
+ if opt.WithBackends != nil {
+ cleanups = append(cleanups, opt.WithBackends(conf.VirtualStorages)...)
+ }
+ if opt.WithAnnotations == nil {
+ opt.WithAnnotations = protoregistry.GitalyProtoPreregistered
+ }
+ if opt.WithLogger == nil {
+ opt.WithLogger = log.Default()
+ }
+ if opt.WithNodeMgr == nil {
+ opt.WithNodeMgr = defaultNodeMgr(t, conf, opt.WithRepoStore)
+ }
+ if opt.WithAssignmentStore == nil {
+ opt.WithAssignmentStore = NewDisabledAssignmentStore(conf.StorageNames())
+ }
+ if opt.WithRouter == nil {
+ opt.WithRouter = NewNodeManagerRouter(opt.WithNodeMgr, opt.WithRepoStore)
+ }
+
+ coordinator := NewCoordinator(
+ opt.WithQueue,
+ opt.WithRepoStore,
+ opt.WithRouter,
+ opt.WithTxMgr,
+ conf,
+ opt.WithAnnotations,
+ )
+
+ // TODO: run a replmgr for EVERY virtual storage
+ replmgr := NewReplMgr(
+ opt.WithLogger,
+ conf.StorageNames(),
+ opt.WithQueue,
+ opt.WithRepoStore,
+ opt.WithNodeMgr,
+ NodeSetFromNodeManager(opt.WithNodeMgr),
+ )
+
+ prf := NewGRPCServer(
+ conf,
+ opt.WithLogger,
+ protoregistry.GitalyProtoPreregistered,
+ coordinator.StreamDirector,
+ opt.WithTxMgr,
+ opt.WithRepoStore,
+ opt.WithAssignmentStore,
+ opt.WithConnections,
+ opt.WithPrimaryGetter,
+ nil,
+ )
+
+ listener, port := listenAvailPort(t)
+
+ errQ := make(chan error)
+ ctx, cancel := context.WithCancel(ctx)
+
+ go func() {
+ errQ <- prf.Serve(listener)
+ close(errQ)
+ }()
+ replMgrDone := startProcessBacklog(ctx, replmgr)
+
+ // dial client to praefect
+ cc := dialLocalPort(t, port, false)
+
+ cleanup := func() {
+ cc.Close()
+
+ for _, cu := range cleanups {
+ cu()
+ }
+
+ prf.Stop()
+
+ cancel()
+ <-replMgrDone
+ require.NoError(t, <-errQ)
+ }
+
+ return cc, prf, cleanup
+}
diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go
index c76959c76..d709ac83d 100644
--- a/internal/praefect/transaction_test.go
+++ b/internal/praefect/transaction_test.go
@@ -32,9 +32,9 @@ type voter struct {
func runPraefectServerAndTxMgr(t testing.TB, ctx context.Context) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) {
conf := testConfig(1)
txMgr := transactions.NewManager(conf)
- cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withTxMgr: txMgr,
- withNodeMgr: nullNodeMgr{}, // to suppress node address issues
+ cc, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithTxMgr: txMgr,
+ WithNodeMgr: nullNodeMgr{}, // to suppress node address issues
})
return cc, txMgr, cleanup
}
diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go
index ccf470116..f152779d9 100644
--- a/internal/praefect/verifier_test.go
+++ b/internal/praefect/verifier_test.go
@@ -528,8 +528,8 @@ func TestVerifier(t *testing.T) {
conns := nodeSet.Connections()
rs := datastore.NewPostgresRepositoryStore(db, conf.StorageNames())
- conn, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
- withRouter: NewPerRepositoryRouter(
+ conn, _, cleanup := RunPraefectServer(t, ctx, conf, BuildOptions{
+ WithRouter: NewPerRepositoryRouter(
conns,
elector,
StaticHealthChecker(conf.StorageNames()),
@@ -539,8 +539,8 @@ func TestVerifier(t *testing.T) {
rs,
conf.DefaultReplicationFactors(),
),
- withRepoStore: rs,
- withTxMgr: txManager,
+ WithRepoStore: rs,
+ WithTxMgr: txManager,
})
t.Cleanup(cleanup)