diff options
author | John Cai <jcai@gitlab.com> | 2020-01-29 05:24:44 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-01-29 05:24:44 +0300 |
commit | c6099504b3355f12ea618451b7912b5b80a36593 (patch) | |
tree | 98aa5b58c71e04d4352cf4bd62a57808d1644aaa | |
parent | 6582d73007e9022b7c844d018c80b0eafa9ca189 (diff) | |
parent | 80f37b6b0da168f53086d05a1e869081d2d8d11a (diff) |
Merge branch 'jc-praefect-test' into 'master'
Add praefect as a transparent pass through for tests
Closes #2122
See merge request gitlab-org/gitaly!1736
-rw-r--r-- | .gitlab-ci.yml | 5 | ||||
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | _support/Makefile.template | 6 | ||||
-rw-r--r-- | changelogs/unreleased/jc-praefect-test.yml | 5 | ||||
-rw-r--r-- | internal/errors/errors.go | 10 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 22 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 5 | ||||
-rw-r--r-- | internal/praefect/protoregistry/find_oid.go | 4 | ||||
-rw-r--r-- | internal/service/blob/get_blob_test.go | 12 | ||||
-rw-r--r-- | internal/service/blob/get_blobs_test.go | 8 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers.go | 3 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers_test.go | 28 | ||||
-rw-r--r-- | internal/service/blob/testhelper_test.go | 22 | ||||
-rw-r--r-- | internal/testhelper/testserver.go | 178 |
14 files changed, 267 insertions, 45 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f9dbc908a..d02ca67e1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -137,6 +137,11 @@ test:proxy: script: - make test-with-proxies +test:praefect: + <<: *test_definition + script: + - make test-with-praefect + race: <<: *go_test_definition script: @@ -60,6 +60,10 @@ prepare-tests: prepare-build test: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ +.PHONY: test-with-praefect +test-with-praefect: prepare-build + cd $(BUILD_DIR) && $(MAKE) $@ + .PHONY: test-with-proxies test-with-proxies: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ diff --git a/_support/Makefile.template b/_support/Makefile.template index 118100d6f..80a3b0457 100644 --- a/_support/Makefile.template +++ b/_support/Makefile.template @@ -130,6 +130,12 @@ test-with-proxies: prepare-tests @cd {{ .SourceDir }} &&\ go test -tags "$(BUILD_TAGS)" -count=1 -exec {{ .SourceDir }}/_support/bad-proxies {{ .Pkg }}/internal/rubyserver/ + +.PHONY: test-with-praefect +test-with-praefect: build prepare-tests + @cd {{ .SourceDir }} &&\ + GITALY_TEST_PRAEFECT_BIN={{ .BuildDir }}/bin/praefect go test -tags "$(BUILD_TAGS)" -count=1 {{ join .AllPackages " " }} # count=1 bypasses go 1.10 test caching + .PHONY: race-go race-go: prepare-tests @cd {{ .SourceDir }} && go test -tags "$(BUILD_TAGS)" -race {{ join .AllPackages " " }} diff --git a/changelogs/unreleased/jc-praefect-test.yml b/changelogs/unreleased/jc-praefect-test.yml new file mode 100644 index 000000000..94149b755 --- /dev/null +++ b/changelogs/unreleased/jc-praefect-test.yml @@ -0,0 +1,5 @@ +--- +title: Add praefect as a transparent pass through for tests +merge_request: 1736 +author: +type: other diff --git a/internal/errors/errors.go b/internal/errors/errors.go new file mode 100644 index 000000000..d2efaef52 --- /dev/null +++ b/internal/errors/errors.go @@ -0,0 +1,10 @@ +package errors + +import "errors" + +var ( + // ErrEmptyRepository is returned when an RPC is missing a repository as an argument + ErrEmptyRepository = errors.New("empty Repository") + // ErrInvalidRepository is returned when an RPC has an invalid repository as an argument + ErrInvalidRepository = errors.New("invalid Repository") +) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index ef9bbdc28..a476b61c9 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -12,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" + internalerrs "gitlab.com/gitlab-org/gitaly/internal/errors" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -19,6 +20,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func isDestructive(methodName string) bool { @@ -81,9 +84,19 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var storage string if mi.Scope == protoregistry.ScopeRepository { - storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) - if err != nil { - return nil, err + var getRepoErr error + storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) + + if getRepoErr == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) + } + + if getRepoErr != nil { + return nil, getRepoErr + } + + if storage == "" { + return nil, status.Error(codes.InvalidArgument, "storage not found") } } else { storage, requestFinalizer, err = c.getAnyStorageNode() @@ -185,6 +198,9 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName()) if err != nil { + if err == datastore.ErrNoPrimaryForStorage { + return nil, status.Error(codes.InvalidArgument, internalerrs.ErrInvalidRepository.Error()) + } return nil, fmt.Errorf("could not choose a primary: %v", err) } diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 2830b197a..c8de9d82c 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -179,6 +179,9 @@ func NewInMemory(cfg config.Config) *MemoryDatastore { return m } +// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it +var ErrNoPrimaryForStorage = errors.New("no primary for storage") + // PickAPrimary returns the primary configured in the config file func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, error) { for _, node := range md.virtualStorages[virtualStorage] { @@ -187,7 +190,7 @@ func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, err } } - return models.Node{}, errors.New("no default primaries found") + return models.Node{}, ErrNoPrimaryForStorage } // GetReplicas gets the secondaries for a repository based on the relative path diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go index c17324a96..8b73d4943 100644 --- a/internal/praefect/protoregistry/find_oid.go +++ b/internal/praefect/protoregistry/find_oid.go @@ -17,7 +17,7 @@ const ( ) // ErrTargetRepoMissing indicates that the target repo is missing or not set -var ErrTargetRepoMissing = errors.New("target repo is not set") +var ErrTargetRepoMissing = errors.New("empty Repository") func reflectFindRepoTarget(pbMsg proto.Message, targetOID []int) (*gitalypb.Repository, error) { msgV, e := reflectFindOID(pbMsg, targetOID) @@ -58,7 +58,7 @@ func reflectFindOID(pbMsg proto.Message, targetOID []int) (reflect.Value, error) msgV, err = findProtoField(msgV, fieldNo) if err != nil { return reflect.Value{}, fmt.Errorf( - "unable to descend OID %+v into message %s: %s", + "unable to descend OID %+v into message %s: %v", targetOID, proto.MessageName(pbMsg), err, ) } diff --git a/internal/service/blob/get_blob_test.go b/internal/service/blob/get_blob_test.go index 728a1b247..ca24b07ad 100644 --- a/internal/service/blob/get_blob_test.go +++ b/internal/service/blob/get_blob_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetBlob(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -90,8 +90,8 @@ func TestSuccessfulGetBlob(t *testing.T) { } func TestGetBlobNotFound(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -146,8 +146,8 @@ func getBlob(stream gitalypb.BlobService_GetBlobClient) (int64, string, []byte, } func TestFailedGetBlobRequestDueToValidationError(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() diff --git a/internal/service/blob/get_blobs_test.go b/internal/service/blob/get_blobs_test.go index 8b7199731..8ecfcb696 100644 --- a/internal/service/blob/get_blobs_test.go +++ b/internal/service/blob/get_blobs_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetBlobsRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -133,8 +133,8 @@ func TestSuccessfulGetBlobsRequest(t *testing.T) { } func TestFailedGetBlobsRequestDueToValidation(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go index 9fffdb7a3..e4b61490f 100644 --- a/internal/service/blob/lfs_pointers.go +++ b/internal/service/blob/lfs_pointers.go @@ -3,6 +3,7 @@ package blob import ( "fmt" + "gitlab.com/gitlab-org/gitaly/internal/errors" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -62,7 +63,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita func validateGetLFSPointersRequest(req *gitalypb.GetLFSPointersRequest) error { if req.GetRepository() == nil { - return fmt.Errorf("empty Repository") + return errors.ErrEmptyRepository } if len(req.GetBlobIds()) == 0 { diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go index 019caa37c..7126b35f8 100644 --- a/internal/service/blob/lfs_pointers_test.go +++ b/internal/service/blob/lfs_pointers_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -77,8 +77,8 @@ func TestSuccessfulGetLFSPointersRequest(t *testing.T) { } func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -125,8 +125,8 @@ func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) { } func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -291,8 +291,8 @@ func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) { } func TestFailedGetNewLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -352,8 +352,8 @@ func drainNewPointers(c gitalypb.BlobService_GetNewLFSPointersClient) error { } func TestSuccessfulGetAllLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -423,8 +423,8 @@ func getAllPointers(t *testing.T, c gitalypb.BlobService_GetAllLFSPointersClient } func TestFailedGetAllLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -470,8 +470,8 @@ func drainAllPointers(c gitalypb.BlobService_GetAllLFSPointersClient) error { // TestGetAllLFSPointersVerifyScope verifies that this RPC returns all LFS // pointers in a repository, not only ones reachable from the default branch func TestGetAllLFSPointersVerifyScope(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() diff --git a/internal/service/blob/testhelper_test.go b/internal/service/blob/testhelper_test.go index c8d990995..d28ad8eb8 100644 --- a/internal/service/blob/testhelper_test.go +++ b/internal/service/blob/testhelper_test.go @@ -2,10 +2,10 @@ package blob import ( "log" - "net" "os" "testing" + "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -23,6 +23,7 @@ func testMain(m *testing.M) int { defer testhelper.MustHaveNoChildProcess() testhelper.ConfigureRuby() + if err := rubyServer.Start(); err != nil { log.Fatal(err) } @@ -31,22 +32,15 @@ func testMain(m *testing.M) int { return m.Run() } -func runBlobServer(t *testing.T) (*grpc.Server, string) { - grpcServer := testhelper.NewTestGrpcServer(t, nil, nil) - - serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() - listener, err := net.Listen("unix", serverSocketPath) - - if err != nil { - t.Fatal(err) - } +func runBlobServer(t *testing.T) (func(), string) { + srv := testhelper.NewServer(t, nil, nil) - gitalypb.RegisterBlobServiceServer(grpcServer, &server{ruby: rubyServer}) - reflection.Register(grpcServer) + gitalypb.RegisterBlobServiceServer(srv.GrpcServer(), &server{ruby: rubyServer}) + reflection.Register(srv.GrpcServer()) - go grpcServer.Serve(listener) + require.NoError(t, srv.Start()) - return grpcServer, "unix://" + serverSocketPath + return srv.Stop, "unix://" + srv.Socket() } func newBlobClient(t *testing.T, serverSocketPath string) (gitalypb.BlobServiceClient, *grpc.ClientConn) { diff --git a/internal/testhelper/testserver.go b/internal/testhelper/testserver.go new file mode 100644 index 000000000..4d69fbfc2 --- /dev/null +++ b/internal/testhelper/testserver.go @@ -0,0 +1,178 @@ +package testhelper + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/BurntSushi/toml" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" + praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// NewTestServer instantiates a new TestServer +func NewTestServer(srv *grpc.Server) *TestServer { + return &TestServer{ + grpcServer: srv, + } +} + +// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance +// if necessary +type TestServer struct { + grpcServer *grpc.Server + socket string + process *os.Process +} + +// GrpcServer returns the underlying grpc.Server +func (p *TestServer) GrpcServer() *grpc.Server { + return p.grpcServer +} + +// Stop will stop both the grpc server as well as the praefect process +func (p *TestServer) Stop() { + p.grpcServer.Stop() + if p.process != nil { + p.process.Kill() + } +} + +// Socket returns the socket file the test server is listening on +func (p *TestServer) Socket() string { + return p.socket +} + +// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled +func (p *TestServer) Start() error { + gitalyServerSocketPath := GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", gitalyServerSocketPath) + if err != nil { + return err + } + + go p.grpcServer.Serve(listener) + + praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") + if !ok { + p.socket = gitalyServerSocketPath + return nil + } + + tempDir, err := ioutil.TempDir("", "praefect-test-server") + if err != nil { + return err + } + defer os.RemoveAll(tempDir) + + praefectServerSocketPath := GetTemporaryGitalySocketFileName() + + configFilePath := filepath.Join(tempDir, "config.toml") + configFile, err := os.Create(configFilePath) + if err != nil { + return err + } + defer configFile.Close() + + c := praefectconfig.Config{ + SocketPath: praefectServerSocketPath, + VirtualStorages: []*praefectconfig.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "default", + Address: "unix:/" + gitalyServerSocketPath, + DefaultPrimary: true, + }, + }, + }, + }, + } + + if err := toml.NewEncoder(configFile).Encode(&c); err != nil { + return err + } + if err = configFile.Sync(); err != nil { + return err + } + configFile.Close() + + cmd := exec.Command(praefectBinPath, "-config", configFilePath) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + + p.socket = praefectServerSocketPath + + if err := cmd.Start(); err != nil { + return err + } + go cmd.Wait() + + conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure()) + + if err != nil { + return fmt.Errorf("dial: %v", err) + } + defer conn.Close() + + if err = waitForPraefectStartup(conn); err != nil { + return err + } + + p.process = cmd.Process + + return nil +} + +func waitForPraefectStartup(conn *grpc.ClientConn) error { + client := healthpb.NewHealthClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}, grpc.WaitForReady(true)) + if err != nil { + return err + } + + if resp.Status != healthpb.HealthCheckResponse_SERVING { + return errors.New("server not yet ready to serve") + } + + return nil +} + +// NewServer creates a Server for testing purposes +func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer { + logger := NewTestLogger(tb) + logrusEntry := log.NewEntry(logger).WithField("test", tb.Name()) + + ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) + ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger) + ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger) + + streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...) + unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...) + + return NewTestServer( + grpc.NewServer( + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + )) +} |