diff options
author | John Cai <jcai@gitlab.com> | 2020-01-15 03:14:25 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-01-29 05:07:34 +0300 |
commit | 80f37b6b0da168f53086d05a1e869081d2d8d11a (patch) | |
tree | 98aa5b58c71e04d4352cf4bd62a57808d1644aaa | |
parent | e11956afe6753f542be57876f3e5551e5c64d5b8 (diff) |
Updating blob tests with new praefect test server
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | _support/Makefile.template | 6 | ||||
-rw-r--r-- | changelogs/unreleased/jc-praefect-test.yml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 14 | ||||
-rw-r--r-- | internal/praefect/protoregistry/find_oid.go | 2 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers.go | 3 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers_test.go | 4 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 138 | ||||
-rw-r--r-- | internal/testhelper/testserver.go | 178 |
9 files changed, 206 insertions, 152 deletions
@@ -61,12 +61,8 @@ test: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ .PHONY: test-with-praefect -test-with-praefect: build-praefect - cd $(BUILD_DIR) && GITALY_TEST_PRAEFECT_BIN=1 PRAEFECT_BIN_PATH="$(BUILD_DIR)/bin/praefect" $(MAKE) test - -.PHONY: build-praefect -build-praefect: prepare-build - cd cmd/praefect && go build -o "$(BUILD_DIR)/bin/praefect" +test-with-praefect: prepare-build + cd $(BUILD_DIR) && $(MAKE) $@ .PHONY: test-with-proxies test-with-proxies: prepare-build diff --git a/_support/Makefile.template b/_support/Makefile.template index 118100d6f..80a3b0457 100644 --- a/_support/Makefile.template +++ b/_support/Makefile.template @@ -130,6 +130,12 @@ test-with-proxies: prepare-tests @cd {{ .SourceDir }} &&\ go test -tags "$(BUILD_TAGS)" -count=1 -exec {{ .SourceDir }}/_support/bad-proxies {{ .Pkg }}/internal/rubyserver/ + +.PHONY: test-with-praefect +test-with-praefect: build prepare-tests + @cd {{ .SourceDir }} &&\ + GITALY_TEST_PRAEFECT_BIN={{ .BuildDir }}/bin/praefect go test -tags "$(BUILD_TAGS)" -count=1 {{ join .AllPackages " " }} # count=1 bypasses go 1.10 test caching + .PHONY: race-go race-go: prepare-tests @cd {{ .SourceDir }} && go test -tags "$(BUILD_TAGS)" -race {{ join .AllPackages " " }} diff --git a/changelogs/unreleased/jc-praefect-test.yml b/changelogs/unreleased/jc-praefect-test.yml new file mode 100644 index 000000000..94149b755 --- /dev/null +++ b/changelogs/unreleased/jc-praefect-test.yml @@ -0,0 +1,5 @@ +--- +title: Add praefect as a transparent pass through for tests +merge_request: 1736 +author: +type: other diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 2a4859a8e..a476b61c9 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -83,15 +83,21 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var requestFinalizer func() var storage string - var getRepoErr error if mi.Scope == protoregistry.ScopeRepository { + var getRepoErr error storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) + + if getRepoErr == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) + } + if getRepoErr != nil { - if getRepoErr == protoregistry.ErrTargetRepoMissing { - return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) - } return nil, getRepoErr } + + if storage == "" { + return nil, status.Error(codes.InvalidArgument, "storage not found") + } } else { storage, requestFinalizer, err = c.getAnyStorageNode() if err != nil { diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go index 3e3db4485..8b73d4943 100644 --- a/internal/praefect/protoregistry/find_oid.go +++ b/internal/praefect/protoregistry/find_oid.go @@ -58,7 +58,7 @@ func reflectFindOID(pbMsg proto.Message, targetOID []int) (reflect.Value, error) msgV, err = findProtoField(msgV, fieldNo) if err != nil { return reflect.Value{}, fmt.Errorf( - "unable to descend OID %+v into message %s: %s", + "unable to descend OID %+v into message %s: %v", targetOID, proto.MessageName(pbMsg), err, ) } diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go index 9fffdb7a3..e4b61490f 100644 --- a/internal/service/blob/lfs_pointers.go +++ b/internal/service/blob/lfs_pointers.go @@ -3,6 +3,7 @@ package blob import ( "fmt" + "gitlab.com/gitlab-org/gitaly/internal/errors" "gitlab.com/gitlab-org/gitaly/internal/git" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" @@ -62,7 +63,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita func validateGetLFSPointersRequest(req *gitalypb.GetLFSPointersRequest) error { if req.GetRepository() == nil { - return fmt.Errorf("empty Repository") + return errors.ErrEmptyRepository } if len(req.GetBlobIds()) == 0 { diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go index 7dd31eda2..7126b35f8 100644 --- a/internal/service/blob/lfs_pointers_test.go +++ b/internal/service/blob/lfs_pointers_test.go @@ -470,8 +470,8 @@ func drainAllPointers(c gitalypb.BlobService_GetAllLFSPointersClient) error { // TestGetAllLFSPointersVerifyScope verifies that this RPC returns all LFS // pointers in a repository, not only ones reachable from the default branch func TestGetAllLFSPointersVerifyScope(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index e9de2fd30..a052c5119 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -21,7 +21,6 @@ import ( "testing" "time" - "github.com/BurntSushi/toml" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -33,13 +32,10 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/helper/text" gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" - praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -317,121 +313,6 @@ func GetGitEnvData() (string, error) { return string(gitEnvBytes), nil } -// NewTestServer instantiates a new TestServer -func NewTestServer(srv *grpc.Server) *TestServer { - return &TestServer{ - grpcServer: srv, - } -} - -// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance -// if necessary -type TestServer struct { - grpcServer *grpc.Server - socket string - process *os.Process -} - -// GrpcServer returns the underlying grpc.Server -func (p *TestServer) GrpcServer() *grpc.Server { - return p.grpcServer -} - -// Stop will stop both the grpc server as well as the praefect process -func (p *TestServer) Stop() { - p.grpcServer.Stop() - if p.process != nil { - p.process.Kill() - } -} - -// Socket returns the socket file the test server is listening on -func (p *TestServer) Socket() string { - return p.socket -} - -// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled -func (p *TestServer) Start() error { - gitalyServerSocketPath := GetTemporaryGitalySocketFileName() - - listener, err := net.Listen("unix", gitalyServerSocketPath) - if err != nil { - return err - } - - go p.grpcServer.Serve(listener) - - if os.Getenv("GITALY_TEST_PRAEFECT_BIN") == "1" { - tempDir, err := ioutil.TempDir("", "praefect-test-server") - if err != nil { - return err - } - - praefectServerSocketPath := GetTemporaryGitalySocketFileName() - - configFilePath := filepath.Join(tempDir, "config.toml") - configFile, err := os.Create(configFilePath) - if err != nil { - return err - } - defer configFile.Close() - - c := praefectconfig.Config{ - SocketPath: praefectServerSocketPath, - VirtualStorages: []*praefectconfig.VirtualStorage{ - { - Name: "default", - Nodes: []*models.Node{ - { - Storage: "default", - Address: "unix:/" + gitalyServerSocketPath, - DefaultPrimary: true, - }, - }, - }, - }, - } - - if err := toml.NewEncoder(configFile).Encode(&c); err != nil { - return err - } - - cmd := exec.Command(os.Getenv("PRAEFECT_BIN_PATH"), "-config", configFilePath) - cmd.Stderr = os.Stderr - - p.socket = praefectServerSocketPath - - go cmd.Run() - - conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure()) - - if err != nil { - return fmt.Errorf("dial: %v", err) - } - defer conn.Close() - - client := healthpb.NewHealthClient(conn) - ctx, cancel := Context() - defer cancel() - - for { - resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}) - if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING { - break - } - time.Sleep(1 * time.Microsecond) - } - - os.Remove(tempDir) - p.process = cmd.Process - - return nil - } - - p.socket = gitalyServerSocketPath - return nil -} - // NewTestGrpcServer creates a GRPC Server for testing purposes func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *grpc.Server { logger := NewTestLogger(tb) @@ -450,25 +331,6 @@ func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInte ) } -// NewServer creates a Server for testing purposes -func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer { - logger := NewTestLogger(tb) - logrusEntry := log.NewEntry(logger).WithField("test", tb.Name()) - - ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) - ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger) - ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger) - - streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...) - unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...) - - return NewTestServer( - grpc.NewServer( - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), - )) -} - // MustHaveNoChildProcess panics if it finds a running or finished child // process. It waits for 2 seconds for processes to be cleaned up by other // goroutines. diff --git a/internal/testhelper/testserver.go b/internal/testhelper/testserver.go new file mode 100644 index 000000000..4d69fbfc2 --- /dev/null +++ b/internal/testhelper/testserver.go @@ -0,0 +1,178 @@ +package testhelper + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/BurntSushi/toml" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" + log "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" + praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// NewTestServer instantiates a new TestServer +func NewTestServer(srv *grpc.Server) *TestServer { + return &TestServer{ + grpcServer: srv, + } +} + +// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance +// if necessary +type TestServer struct { + grpcServer *grpc.Server + socket string + process *os.Process +} + +// GrpcServer returns the underlying grpc.Server +func (p *TestServer) GrpcServer() *grpc.Server { + return p.grpcServer +} + +// Stop will stop both the grpc server as well as the praefect process +func (p *TestServer) Stop() { + p.grpcServer.Stop() + if p.process != nil { + p.process.Kill() + } +} + +// Socket returns the socket file the test server is listening on +func (p *TestServer) Socket() string { + return p.socket +} + +// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled +func (p *TestServer) Start() error { + gitalyServerSocketPath := GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", gitalyServerSocketPath) + if err != nil { + return err + } + + go p.grpcServer.Serve(listener) + + praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN") + if !ok { + p.socket = gitalyServerSocketPath + return nil + } + + tempDir, err := ioutil.TempDir("", "praefect-test-server") + if err != nil { + return err + } + defer os.RemoveAll(tempDir) + + praefectServerSocketPath := GetTemporaryGitalySocketFileName() + + configFilePath := filepath.Join(tempDir, "config.toml") + configFile, err := os.Create(configFilePath) + if err != nil { + return err + } + defer configFile.Close() + + c := praefectconfig.Config{ + SocketPath: praefectServerSocketPath, + VirtualStorages: []*praefectconfig.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "default", + Address: "unix:/" + gitalyServerSocketPath, + DefaultPrimary: true, + }, + }, + }, + }, + } + + if err := toml.NewEncoder(configFile).Encode(&c); err != nil { + return err + } + if err = configFile.Sync(); err != nil { + return err + } + configFile.Close() + + cmd := exec.Command(praefectBinPath, "-config", configFilePath) + cmd.Stderr = os.Stderr + cmd.Stdout = os.Stdout + + p.socket = praefectServerSocketPath + + if err := cmd.Start(); err != nil { + return err + } + go cmd.Wait() + + conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure()) + + if err != nil { + return fmt.Errorf("dial: %v", err) + } + defer conn.Close() + + if err = waitForPraefectStartup(conn); err != nil { + return err + } + + p.process = cmd.Process + + return nil +} + +func waitForPraefectStartup(conn *grpc.ClientConn) error { + client := healthpb.NewHealthClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}, grpc.WaitForReady(true)) + if err != nil { + return err + } + + if resp.Status != healthpb.HealthCheckResponse_SERVING { + return errors.New("server not yet ready to serve") + } + + return nil +} + +// NewServer creates a Server for testing purposes +func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer { + logger := NewTestLogger(tb) + logrusEntry := log.NewEntry(logger).WithField("test", tb.Name()) + + ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) + ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger) + ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger) + + streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...) + unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...) + + return NewTestServer( + grpc.NewServer( + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + )) +} |