diff options
author | Paul Okstad <pokstad@gitlab.com> | 2020-05-15 20:10:50 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2020-05-15 20:10:50 +0300 |
commit | 382abd38d271f87c5b967d20e55ade4d8257e9ae (patch) | |
tree | 9d9a30778f861ddde4e2dd71b69877672094c605 | |
parent | 430f09178c8633dd8b5093d668d70c894b5ca928 (diff) |
Refactor Praefect test server helpers
-rw-r--r-- | internal/praefect/dataloss_check_test.go | 5 | ||||
-rw-r--r-- | internal/praefect/helper_test.go | 278 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 45 | ||||
-rw-r--r-- | internal/praefect/transaction_test.go | 28 |
4 files changed, 207 insertions, 149 deletions
diff --git a/internal/praefect/dataloss_check_test.go b/internal/praefect/dataloss_check_test.go index 5227dad7b..9d714b235 100644 --- a/internal/praefect/dataloss_check_test.go +++ b/internal/praefect/dataloss_check_test.go @@ -107,7 +107,10 @@ func TestDatalossCheck(t *testing.T) { killJobs(t) cc, _, clean := runPraefectServerWithMock(t, cfg, - datastore.Datastore{ReplicationEventQueue: rq}, + datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(cfg), + ReplicationEventQueue: rq, + }, map[string]mock.SimpleServiceServer{ "not-needed": &mock.UnimplementedSimpleServiceServer{}, }, diff --git a/internal/praefect/helper_test.go b/internal/praefect/helper_test.go index 35aef4372..f82a67dee 100644 --- a/internal/praefect/helper_test.go +++ b/internal/praefect/helper_test.go @@ -73,22 +73,16 @@ func testConfig(backends int) config.Config { return cfg } -// setupServer wires all praefect dependencies together via dependency -// injection -func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, txMgr *transactions.Manager, ds datastore.Datastore, l *logrus.Entry, r *protoregistry.Registry) *Server { - coordinator := NewCoordinator(l, ds, nodeMgr, txMgr, conf, r) - - var defaultNode *models.Node - for _, n := range conf.VirtualStorages[0].Nodes { - if n.DefaultPrimary { - defaultNode = n +func assertPrimariesExist(t testing.TB, conf config.Config) { + for _, vs := range conf.VirtualStorages { + var defaultNode *models.Node + for _, n := range vs.Nodes { + if n.DefaultPrimary { + defaultNode = n + } } + require.NotNil(t, defaultNode) } - require.NotNil(t, defaultNode) - - server := NewServer(coordinator.StreamDirector, l, r, conf) - - return server } // runPraefectServer runs a praefect server with the provided mock servers. @@ -97,67 +91,90 @@ func setupServer(t testing.TB, conf config.Config, nodeMgr nodes.Manager, txMgr // configured storage node. // requires there to be only 1 virtual storage func runPraefectServerWithMock(t *testing.T, conf config.Config, ds datastore.Datastore, backends map[string]mock.SimpleServiceServer) (*grpc.ClientConn, *Server, testhelper.Cleanup) { - require.Len(t, conf.VirtualStorages, 1) - require.Equal(t, len(backends), len(conf.VirtualStorages[0].Nodes), - "mock server count doesn't match config nodes") - - var cleanups []testhelper.Cleanup - - for i, node := range conf.VirtualStorages[0].Nodes { - backend, ok := backends[node.Storage] - require.True(t, ok, "missing backend server for node %s", node.Storage) - - backendAddr, cleanup := newMockDownstream(t, node.Token, backend) - cleanups = append(cleanups, cleanup) + r := protoregistry.New() + require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t))) - node.Address = backendAddr - conf.VirtualStorages[0].Nodes[i] = node - } + return runPraefectServer(t, conf, buildOptions{ + withDatastore: ds, + withBackends: withMockBackends(t, backends), + withAnnotations: r, + }) +} - nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec()) - require.NoError(t, err) - nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) +func noopBackoffFunc() (backoff, backoffReset) { + return func() time.Duration { + return 0 + }, func() {} +} - txMgr := transactions.NewManager() +type nullNodeMgr struct{} - r := protoregistry.New() - require.NoError(t, r.RegisterFiles(mustLoadProtoReg(t))) +func (nullNodeMgr) GetShard(virtualStorageName string) (nodes.Shard, error) { return nodes.Shard{}, nil } +func (nullNodeMgr) EnableWrites(ctx context.Context, virtualStorageName string) error { return nil } - prf := setupServer(t, conf, nodeMgr, txMgr, ds, log.Default(), r) +type buildOptions struct { + withDatastore datastore.Datastore + withTxMgr *transactions.Manager + withBackends func([]*config.VirtualStorage) []testhelper.Cleanup + withAnnotations *protoregistry.Registry + withLogger *logrus.Entry + withNodeMgr nodes.Manager +} - listener, port := listenAvailPort(t) - t.Logf("praefect listening on port %d", port) +func withMockBackends(t testing.TB, backends map[string]mock.SimpleServiceServer) func([]*config.VirtualStorage) []testhelper.Cleanup { + return func(virtualStorages []*config.VirtualStorage) []testhelper.Cleanup { + var cleanups []testhelper.Cleanup - errQ := make(chan error) + for _, vs := range virtualStorages { + require.Equal(t, len(backends), len(vs.Nodes), + "mock server count doesn't match config nodes") - prf.RegisterServices(nodeMgr, txMgr, conf, ds) - go func() { - errQ <- prf.Serve(listener, false) - }() + for i, node := range vs.Nodes { + backend, ok := backends[node.Storage] + require.True(t, ok, "missing backend server for node %s", node.Storage) - // dial client to praefect - cc := dialLocalPort(t, port, false) + backendAddr, cleanup := newMockDownstream(t, node.Token, backend) + cleanups = append(cleanups, cleanup) - cleanup := func() { - for _, cu := range cleanups { - cu() + node.Address = backendAddr + vs.Nodes[i] = node + } } - require.NoError(t, cc.Close()) - require.NoError(t, listener.Close()) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - - require.NoError(t, prf.Shutdown(ctx)) + return cleanups } +} - return cc, prf, cleanup +func flattenVirtualStoragesToStoragePath(virtualStorages []*config.VirtualStorage, storagePath string) []gconfig.Storage { + var storages []gconfig.Storage + for _, vStorage := range virtualStorages { + for _, node := range vStorage.Nodes { + storages = append(storages, gconfig.Storage{ + Name: node.Storage, + Path: storagePath, + }) + } + } + return storages } -func noopBackoffFunc() (backoff, backoffReset) { - return func() time.Duration { - return 0 - }, func() {} +// withRealGitalyShared will configure a real Gitaly server backend for a +// Praefect server. The same Gitaly server instance is used for all backend +// storages. +func withRealGitalyShared(t testing.TB) func([]*config.VirtualStorage) []testhelper.Cleanup { + return func(virtualStorages []*config.VirtualStorage) []testhelper.Cleanup { + gStorages := flattenVirtualStoragesToStoragePath(virtualStorages, testhelper.GitlabTestStoragePath()) + _, backendAddr, cleanupGitaly := runInternalGitalyServer(t, gStorages, virtualStorages[0].Nodes[0].Token) + + for _, vs := range virtualStorages { + for i, node := range vs.Nodes { + node.Address = backendAddr + vs.Nodes[i] = node + } + } + + return []testhelper.Cleanup{cleanupGitaly} + } } func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.ClientConn, *Server, testhelper.Cleanup) { @@ -172,51 +189,78 @@ func runPraefectServerWithGitaly(t *testing.T, conf config.Config) (*grpc.Client // runPraefectServerWithGitaly runs a praefect server with actual Gitaly nodes // requires exactly 1 virtual storage func runPraefectServerWithGitalyWithDatastore(t *testing.T, conf config.Config, ds datastore.Datastore) (*grpc.ClientConn, *Server, testhelper.Cleanup) { - return runPraefectServer(t, conf, ds, transactions.NewManager()) + return runPraefectServer(t, conf, buildOptions{ + withDatastore: ds, + withTxMgr: transactions.NewManager(), + withBackends: withRealGitalyShared(t), + }) } -func runPraefectServer(t *testing.T, conf config.Config, ds datastore.Datastore, txMgr *transactions.Manager) (*grpc.ClientConn, *Server, testhelper.Cleanup) { - require.Len(t, conf.VirtualStorages, 1) - var cleanups []testhelper.Cleanup - - var storages []gconfig.Storage - for _, node := range conf.VirtualStorages[0].Nodes { - storages = append(storages, gconfig.Storage{ - Name: node.Storage, - Path: testhelper.GitlabTestStoragePath(), - }) - } - - _, backendAddr, cleanupGitaly := runInternalGitalyServer(t, storages, conf.VirtualStorages[0].Nodes[0].Token) - cleanups = append(cleanups, cleanupGitaly) - - for i, node := range conf.VirtualStorages[0].Nodes { - node.Address = backendAddr - conf.VirtualStorages[0].Nodes[i] = node +func defaultDatastore(conf config.Config) datastore.Datastore { + return datastore.Datastore{ + ReplicasDatastore: datastore.NewInMemory(conf), + ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), } +} - logEntry := log.Default() +func defaultTxMgr() *transactions.Manager { + return transactions.NewManager() +} +func defaultNodeMgr(t testing.TB, conf config.Config) nodes.Manager { nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, promtest.NewMockHistogramVec()) require.NoError(t, err) nodeMgr.Start(1*time.Millisecond, 5*time.Millisecond) + return nodeMgr +} + +func defaultAnnotations(t testing.TB) *protoregistry.Registry { + r := protoregistry.New() + require.NoError(t, r.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) + return r +} + +func runPraefectServer(t testing.TB, conf config.Config, opt buildOptions) (*grpc.ClientConn, *Server, testhelper.Cleanup) { + assertPrimariesExist(t, conf) + + var cleanups []testhelper.Cleanup + + if opt.withDatastore == (datastore.Datastore{}) { + opt.withDatastore = defaultDatastore(conf) + } + if opt.withTxMgr == nil { + opt.withTxMgr = defaultTxMgr() + } + if opt.withBackends != nil { + cleanups = append(cleanups, opt.withBackends(conf.VirtualStorages)...) + } + if opt.withAnnotations == nil { + opt.withAnnotations = defaultAnnotations(t) + } + if opt.withLogger == nil { + opt.withLogger = log.Default() + } + if opt.withNodeMgr == nil { + opt.withNodeMgr = defaultNodeMgr(t, conf) + } - registry := protoregistry.New() - require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - coordinator := NewCoordinator(logEntry, ds, nodeMgr, txMgr, conf, registry) + coordinator := NewCoordinator( + opt.withLogger, + opt.withDatastore, + opt.withNodeMgr, + opt.withTxMgr, + conf, + opt.withAnnotations, + ) + // TODO: run a replmgr for EVERY virtual storage replmgr := NewReplMgr( - logEntry, - ds, - nodeMgr, + opt.withLogger, + opt.withDatastore, + opt.withNodeMgr, WithQueueMetric(&promtest.MockGauge{}), ) - prf := NewServer( - coordinator.StreamDirector, - logEntry, - registry, - conf, - ) + prf := NewServer(coordinator.StreamDirector, opt.withLogger, opt.withAnnotations, conf) listener, port := listenAvailPort(t) t.Logf("proxy listening on port %d", port) @@ -224,7 +268,7 @@ func runPraefectServer(t *testing.T, conf config.Config, ds datastore.Datastore, errQ := make(chan error) ctx, cancel := testhelper.Context() - prf.RegisterServices(nodeMgr, txMgr, conf, ds) + prf.RegisterServices(opt.withNodeMgr, opt.withTxMgr, conf, opt.withDatastore) go func() { errQ <- prf.Serve(listener, false) }() replmgr.ProcessBacklog(ctx, noopBackoffFunc) @@ -247,7 +291,37 @@ func runPraefectServer(t *testing.T, conf config.Config, ds datastore.Datastore, return cc, prf, cleanup } -func runInternalGitalyServer(t *testing.T, storages []gconfig.Storage, token string) (*grpc.Server, string, func()) { +// partialGitaly is a subset of Gitaly's behavior needed to properly test +// Praefect +type partialGitaly interface { + gitalypb.ServerServiceServer + gitalypb.RepositoryServiceServer + gitalypb.InternalGitalyServer + healthpb.HealthServer +} + +func registerServices(server *grpc.Server, pg partialGitaly) { + gitalypb.RegisterServerServiceServer(server, pg) + gitalypb.RegisterRepositoryServiceServer(server, pg) + gitalypb.RegisterInternalGitalyServer(server, pg) + healthpb.RegisterHealthServer(server, pg) +} + +func realGitaly(storages []gconfig.Storage, authToken, internalSocketPath string) partialGitaly { + return struct { + gitalypb.ServerServiceServer + gitalypb.RepositoryServiceServer + gitalypb.InternalGitalyServer + healthpb.HealthServer + }{ + gitalyserver.NewServer(storages), + repository.NewServer(RubyServer, internalSocketPath), + internalgitaly.NewServer(gconfig.Config.Storages), + health.NewServer(), + } +} + +func runInternalGitalyServer(t testing.TB, storages []gconfig.Storage, token string) (*grpc.Server, string, func()) { streamInt := []grpc.StreamServerInterceptor{auth.StreamServerInterceptor(internalauth.Config{Token: token})} unaryInt := []grpc.UnaryServerInterceptor{auth.UnaryServerInterceptor(internalauth.Config{Token: token})} @@ -257,27 +331,21 @@ func runInternalGitalyServer(t *testing.T, storages []gconfig.Storage, token str listener, err := net.Listen("unix", serverSocketPath) require.NoError(t, err) - internalSocket := gconfig.GitalyInternalSocketPath() - internalListener, err := net.Listen("unix", internalSocket) + internalSocketPath := gconfig.GitalyInternalSocketPath() + internalListener, err := net.Listen("unix", internalSocketPath) require.NoError(t, err) - gitalypb.RegisterServerServiceServer(server, gitalyserver.NewServer(storages)) - gitalypb.RegisterRepositoryServiceServer(server, repository.NewServer(RubyServer, internalSocket)) - gitalypb.RegisterInternalGitalyServer(server, internalgitaly.NewServer(gconfig.Config.Storages)) - healthpb.RegisterHealthServer(server, health.NewServer()) + registerServices(server, realGitaly(storages, token, internalSocketPath)) errQ := make(chan error) - go func() { - errQ <- server.Serve(listener) - }() - go func() { - errQ <- server.Serve(internalListener) - }() + go func() { errQ <- server.Serve(listener) }() + go func() { errQ <- server.Serve(internalListener) }() cleanup := func() { server.Stop() require.NoError(t, <-errQ) + require.NoError(t, <-errQ) } return server, "unix://" + serverSocketPath, cleanup diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 7f4e9ad79..ab4610a1d 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -22,11 +22,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" - "gitlab.com/gitlab-org/gitaly/internal/praefect/nodes" - "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" - "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/internal/testhelper/promtest" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc/health/grpc_health_v1" @@ -155,29 +151,14 @@ func TestGitalyServerInfoBadNode(t *testing.T) { }, } - entry := testhelper.DiscardTestEntry(t) - nodeMgr, err := nodes.NewManager(entry, conf, nil, promtest.NewMockHistogramVec()) - require.NoError(t, err) - - txMgr := transactions.NewManager() - - registry := protoregistry.New() - require.NoError(t, registry.RegisterFiles(protoregistry.GitalyProtoFileDescriptors...)) - - srv := setupServer(t, conf, nodeMgr, txMgr, datastore.Datastore{}, entry, registry) + cc, _, cleanup := runPraefectServer(t, conf, buildOptions{}) + defer cleanup() - listener, port := listenAvailPort(t) - go func() { - srv.RegisterServices(nodeMgr, txMgr, conf, datastore.Datastore{}) - srv.Serve(listener, false) - }() + client := gitalypb.NewServerServiceClient(cc) - cc := dialLocalPort(t, port, false) ctx, cancel := testhelper.Context() defer cancel() - client := gitalypb.NewServerServiceClient(cc) - metadata, err := client.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) require.NoError(t, err) require.Len(t, metadata.GetStorageStatuses(), 0) @@ -299,7 +280,12 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook := test.NewNullLogger() - setupServer(t, conf, nil, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + // instantiate a praefect server and trigger warning + _, _, cleanup := runPraefectServer(t, conf, buildOptions{ + withLogger: logrus.NewEntry(tLogger), + withNodeMgr: nullNodeMgr{}, // to suppress node address issues + }) + defer cleanup() for _, entry := range hook.Entries { require.NotContains(t, entry.Message, "more than one backend node") @@ -326,7 +312,12 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook = test.NewNullLogger() - setupServer(t, conf, nil, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + // instantiate a praefect server and trigger warning + _, _, cleanup = runPraefectServer(t, conf, buildOptions{ + withLogger: logrus.NewEntry(tLogger), + withNodeMgr: nullNodeMgr{}, // to suppress node address issues + }) + defer cleanup() var found bool for _, entry := range hook.Entries { @@ -372,7 +363,11 @@ func TestWarnDuplicateAddrs(t *testing.T) { tLogger, hook = test.NewNullLogger() - setupServer(t, conf, nil, nil, datastore.Datastore{}, logrus.NewEntry(tLogger), nil) // instantiates a praefect server and triggers warning + // instantiate a praefect server and trigger warning + _, _, cleanup = runPraefectServer(t, conf, buildOptions{ + withLogger: logrus.NewEntry(tLogger), + withNodeMgr: nullNodeMgr{}, // to suppress node address issues + }) for _, entry := range hook.Entries { require.NotContains(t, entry.Message, "more than one backend node") diff --git a/internal/praefect/transaction_test.go b/internal/praefect/transaction_test.go index 675de2cc8..30d4f685a 100644 --- a/internal/praefect/transaction_test.go +++ b/internal/praefect/transaction_test.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -18,18 +17,14 @@ import ( "google.golang.org/grpc/status" ) -func runPraefectWithTransactionMgr(t *testing.T, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { +func runPraefectServerAndTxMgr(t testing.TB, opts ...transactions.ManagerOpt) (*grpc.ClientConn, *transactions.Manager, testhelper.Cleanup) { conf := testConfig(1) - - ds := datastore.Datastore{ - ReplicasDatastore: datastore.NewInMemory(conf), - ReplicationEventQueue: datastore.NewMemoryReplicationEventQueue(), - } - txMgr := transactions.NewManager(opts...) - conn, _, cleanup := runPraefectServer(t, conf, ds, txMgr) - - return conn, txMgr, cleanup + cc, _, cleanup := runPraefectServer(t, conf, buildOptions{ + withTxMgr: txMgr, + withNodeMgr: nullNodeMgr{}, // to suppress node address issues + }) + return cc, txMgr, cleanup } func setupMetrics() (*prometheus.CounterVec, []transactions.ManagerOpt) { @@ -65,8 +60,7 @@ func verifyCounterMetrics(t *testing.T, counter *prometheus.CounterVec, expected func TestTransactionSucceeds(t *testing.T) { counter, opts := setupMetrics() - - cc, txMgr, cleanup := runPraefectWithTransactionMgr(t, opts...) + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -97,7 +91,7 @@ func TestTransactionSucceeds(t *testing.T) { } func TestTransactionFailsWithMultipleNodes(t *testing.T) { - _, txMgr, cleanup := runPraefectWithTransactionMgr(t) + _, txMgr, cleanup := runPraefectServerAndTxMgr(t) defer cleanup() ctx, cleanup := testhelper.Context() @@ -109,8 +103,7 @@ func TestTransactionFailsWithMultipleNodes(t *testing.T) { func TestTransactionFailures(t *testing.T) { counter, opts := setupMetrics() - - cc, _, cleanup := runPraefectWithTransactionMgr(t, opts...) + cc, _, cleanup := runPraefectServerAndTxMgr(t, opts...) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -135,8 +128,7 @@ func TestTransactionFailures(t *testing.T) { func TestTransactionCancellation(t *testing.T) { counter, opts := setupMetrics() - - cc, txMgr, cleanup := runPraefectWithTransactionMgr(t, opts...) + cc, txMgr, cleanup := runPraefectServerAndTxMgr(t, opts...) defer cleanup() ctx, cancel := context.WithTimeout(context.Background(), time.Second) |