Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-01-15 03:14:25 +0300
committerJohn Cai <jcai@gitlab.com>2020-01-29 05:07:34 +0300
commit80f37b6b0da168f53086d05a1e869081d2d8d11a (patch)
tree98aa5b58c71e04d4352cf4bd62a57808d1644aaa
parente11956afe6753f542be57876f3e5551e5c64d5b8 (diff)
Updating blob tests with new praefect test server
-rw-r--r--Makefile8
-rw-r--r--_support/Makefile.template6
-rw-r--r--changelogs/unreleased/jc-praefect-test.yml5
-rw-r--r--internal/praefect/coordinator.go14
-rw-r--r--internal/praefect/protoregistry/find_oid.go2
-rw-r--r--internal/service/blob/lfs_pointers.go3
-rw-r--r--internal/service/blob/lfs_pointers_test.go4
-rw-r--r--internal/testhelper/testhelper.go138
-rw-r--r--internal/testhelper/testserver.go178
9 files changed, 206 insertions, 152 deletions
diff --git a/Makefile b/Makefile
index 7e220a8fd..4a49729c2 100644
--- a/Makefile
+++ b/Makefile
@@ -61,12 +61,8 @@ test: prepare-build
cd $(BUILD_DIR) && $(MAKE) $@
.PHONY: test-with-praefect
-test-with-praefect: build-praefect
- cd $(BUILD_DIR) && GITALY_TEST_PRAEFECT_BIN=1 PRAEFECT_BIN_PATH="$(BUILD_DIR)/bin/praefect" $(MAKE) test
-
-.PHONY: build-praefect
-build-praefect: prepare-build
- cd cmd/praefect && go build -o "$(BUILD_DIR)/bin/praefect"
+test-with-praefect: prepare-build
+ cd $(BUILD_DIR) && $(MAKE) $@
.PHONY: test-with-proxies
test-with-proxies: prepare-build
diff --git a/_support/Makefile.template b/_support/Makefile.template
index 118100d6f..80a3b0457 100644
--- a/_support/Makefile.template
+++ b/_support/Makefile.template
@@ -130,6 +130,12 @@ test-with-proxies: prepare-tests
@cd {{ .SourceDir }} &&\
go test -tags "$(BUILD_TAGS)" -count=1 -exec {{ .SourceDir }}/_support/bad-proxies {{ .Pkg }}/internal/rubyserver/
+
+.PHONY: test-with-praefect
+test-with-praefect: build prepare-tests
+ @cd {{ .SourceDir }} &&\
+ GITALY_TEST_PRAEFECT_BIN={{ .BuildDir }}/bin/praefect go test -tags "$(BUILD_TAGS)" -count=1 {{ join .AllPackages " " }} # count=1 bypasses go 1.10 test caching
+
.PHONY: race-go
race-go: prepare-tests
@cd {{ .SourceDir }} && go test -tags "$(BUILD_TAGS)" -race {{ join .AllPackages " " }}
diff --git a/changelogs/unreleased/jc-praefect-test.yml b/changelogs/unreleased/jc-praefect-test.yml
new file mode 100644
index 000000000..94149b755
--- /dev/null
+++ b/changelogs/unreleased/jc-praefect-test.yml
@@ -0,0 +1,5 @@
+---
+title: Add praefect as a transparent pass through for tests
+merge_request: 1736
+author:
+type: other
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 2a4859a8e..a476b61c9 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -83,15 +83,21 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
var requestFinalizer func()
var storage string
- var getRepoErr error
if mi.Scope == protoregistry.ScopeRepository {
+ var getRepoErr error
storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
+
+ if getRepoErr == protoregistry.ErrTargetRepoMissing {
+ return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error())
+ }
+
if getRepoErr != nil {
- if getRepoErr == protoregistry.ErrTargetRepoMissing {
- return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error())
- }
return nil, getRepoErr
}
+
+ if storage == "" {
+ return nil, status.Error(codes.InvalidArgument, "storage not found")
+ }
} else {
storage, requestFinalizer, err = c.getAnyStorageNode()
if err != nil {
diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go
index 3e3db4485..8b73d4943 100644
--- a/internal/praefect/protoregistry/find_oid.go
+++ b/internal/praefect/protoregistry/find_oid.go
@@ -58,7 +58,7 @@ func reflectFindOID(pbMsg proto.Message, targetOID []int) (reflect.Value, error)
msgV, err = findProtoField(msgV, fieldNo)
if err != nil {
return reflect.Value{}, fmt.Errorf(
- "unable to descend OID %+v into message %s: %s",
+ "unable to descend OID %+v into message %s: %v",
targetOID, proto.MessageName(pbMsg), err,
)
}
diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go
index 9fffdb7a3..e4b61490f 100644
--- a/internal/service/blob/lfs_pointers.go
+++ b/internal/service/blob/lfs_pointers.go
@@ -3,6 +3,7 @@ package blob
import (
"fmt"
+ "gitlab.com/gitlab-org/gitaly/internal/errors"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
@@ -62,7 +63,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
func validateGetLFSPointersRequest(req *gitalypb.GetLFSPointersRequest) error {
if req.GetRepository() == nil {
- return fmt.Errorf("empty Repository")
+ return errors.ErrEmptyRepository
}
if len(req.GetBlobIds()) == 0 {
diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go
index 7dd31eda2..7126b35f8 100644
--- a/internal/service/blob/lfs_pointers_test.go
+++ b/internal/service/blob/lfs_pointers_test.go
@@ -470,8 +470,8 @@ func drainAllPointers(c gitalypb.BlobService_GetAllLFSPointersClient) error {
// TestGetAllLFSPointersVerifyScope verifies that this RPC returns all LFS
// pointers in a repository, not only ones reachable from the default branch
func TestGetAllLFSPointersVerifyScope(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index e9de2fd30..a052c5119 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -21,7 +21,6 @@ import (
"testing"
"time"
- "github.com/BurntSushi/toml"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
@@ -33,13 +32,10 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/internal/helper/text"
gitalylog "gitlab.com/gitlab-org/gitaly/internal/log"
- praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/storage"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
- healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@@ -317,121 +313,6 @@ func GetGitEnvData() (string, error) {
return string(gitEnvBytes), nil
}
-// NewTestServer instantiates a new TestServer
-func NewTestServer(srv *grpc.Server) *TestServer {
- return &TestServer{
- grpcServer: srv,
- }
-}
-
-// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance
-// if necessary
-type TestServer struct {
- grpcServer *grpc.Server
- socket string
- process *os.Process
-}
-
-// GrpcServer returns the underlying grpc.Server
-func (p *TestServer) GrpcServer() *grpc.Server {
- return p.grpcServer
-}
-
-// Stop will stop both the grpc server as well as the praefect process
-func (p *TestServer) Stop() {
- p.grpcServer.Stop()
- if p.process != nil {
- p.process.Kill()
- }
-}
-
-// Socket returns the socket file the test server is listening on
-func (p *TestServer) Socket() string {
- return p.socket
-}
-
-// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled
-func (p *TestServer) Start() error {
- gitalyServerSocketPath := GetTemporaryGitalySocketFileName()
-
- listener, err := net.Listen("unix", gitalyServerSocketPath)
- if err != nil {
- return err
- }
-
- go p.grpcServer.Serve(listener)
-
- if os.Getenv("GITALY_TEST_PRAEFECT_BIN") == "1" {
- tempDir, err := ioutil.TempDir("", "praefect-test-server")
- if err != nil {
- return err
- }
-
- praefectServerSocketPath := GetTemporaryGitalySocketFileName()
-
- configFilePath := filepath.Join(tempDir, "config.toml")
- configFile, err := os.Create(configFilePath)
- if err != nil {
- return err
- }
- defer configFile.Close()
-
- c := praefectconfig.Config{
- SocketPath: praefectServerSocketPath,
- VirtualStorages: []*praefectconfig.VirtualStorage{
- {
- Name: "default",
- Nodes: []*models.Node{
- {
- Storage: "default",
- Address: "unix:/" + gitalyServerSocketPath,
- DefaultPrimary: true,
- },
- },
- },
- },
- }
-
- if err := toml.NewEncoder(configFile).Encode(&c); err != nil {
- return err
- }
-
- cmd := exec.Command(os.Getenv("PRAEFECT_BIN_PATH"), "-config", configFilePath)
- cmd.Stderr = os.Stderr
-
- p.socket = praefectServerSocketPath
-
- go cmd.Run()
-
- conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure())
-
- if err != nil {
- return fmt.Errorf("dial: %v", err)
- }
- defer conn.Close()
-
- client := healthpb.NewHealthClient(conn)
- ctx, cancel := Context()
- defer cancel()
-
- for {
- resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{})
- if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING {
- break
- }
- time.Sleep(1 * time.Microsecond)
- }
-
- os.Remove(tempDir)
- p.process = cmd.Process
-
- return nil
- }
-
- p.socket = gitalyServerSocketPath
- return nil
-}
-
// NewTestGrpcServer creates a GRPC Server for testing purposes
func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *grpc.Server {
logger := NewTestLogger(tb)
@@ -450,25 +331,6 @@ func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInte
)
}
-// NewServer creates a Server for testing purposes
-func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer {
- logger := NewTestLogger(tb)
- logrusEntry := log.NewEntry(logger).WithField("test", tb.Name())
-
- ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)
- ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger)
- ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger)
-
- streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...)
- unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...)
-
- return NewTestServer(
- grpc.NewServer(
- grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
- grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
- ))
-}
-
// MustHaveNoChildProcess panics if it finds a running or finished child
// process. It waits for 2 seconds for processes to be cleaned up by other
// goroutines.
diff --git a/internal/testhelper/testserver.go b/internal/testhelper/testserver.go
new file mode 100644
index 000000000..4d69fbfc2
--- /dev/null
+++ b/internal/testhelper/testserver.go
@@ -0,0 +1,178 @@
+package testhelper
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/BurntSushi/toml"
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+ grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ log "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
+ praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+// NewTestServer instantiates a new TestServer
+func NewTestServer(srv *grpc.Server) *TestServer {
+ return &TestServer{
+ grpcServer: srv,
+ }
+}
+
+// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance
+// if necessary
+type TestServer struct {
+ grpcServer *grpc.Server
+ socket string
+ process *os.Process
+}
+
+// GrpcServer returns the underlying grpc.Server
+func (p *TestServer) GrpcServer() *grpc.Server {
+ return p.grpcServer
+}
+
+// Stop will stop both the grpc server as well as the praefect process
+func (p *TestServer) Stop() {
+ p.grpcServer.Stop()
+ if p.process != nil {
+ p.process.Kill()
+ }
+}
+
+// Socket returns the socket file the test server is listening on
+func (p *TestServer) Socket() string {
+ return p.socket
+}
+
+// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled
+func (p *TestServer) Start() error {
+ gitalyServerSocketPath := GetTemporaryGitalySocketFileName()
+
+ listener, err := net.Listen("unix", gitalyServerSocketPath)
+ if err != nil {
+ return err
+ }
+
+ go p.grpcServer.Serve(listener)
+
+ praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN")
+ if !ok {
+ p.socket = gitalyServerSocketPath
+ return nil
+ }
+
+ tempDir, err := ioutil.TempDir("", "praefect-test-server")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(tempDir)
+
+ praefectServerSocketPath := GetTemporaryGitalySocketFileName()
+
+ configFilePath := filepath.Join(tempDir, "config.toml")
+ configFile, err := os.Create(configFilePath)
+ if err != nil {
+ return err
+ }
+ defer configFile.Close()
+
+ c := praefectconfig.Config{
+ SocketPath: praefectServerSocketPath,
+ VirtualStorages: []*praefectconfig.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ {
+ Storage: "default",
+ Address: "unix:/" + gitalyServerSocketPath,
+ DefaultPrimary: true,
+ },
+ },
+ },
+ },
+ }
+
+ if err := toml.NewEncoder(configFile).Encode(&c); err != nil {
+ return err
+ }
+ if err = configFile.Sync(); err != nil {
+ return err
+ }
+ configFile.Close()
+
+ cmd := exec.Command(praefectBinPath, "-config", configFilePath)
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stdout
+
+ p.socket = praefectServerSocketPath
+
+ if err := cmd.Start(); err != nil {
+ return err
+ }
+ go cmd.Wait()
+
+ conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure())
+
+ if err != nil {
+ return fmt.Errorf("dial: %v", err)
+ }
+ defer conn.Close()
+
+ if err = waitForPraefectStartup(conn); err != nil {
+ return err
+ }
+
+ p.process = cmd.Process
+
+ return nil
+}
+
+func waitForPraefectStartup(conn *grpc.ClientConn) error {
+ client := healthpb.NewHealthClient(conn)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}, grpc.WaitForReady(true))
+ if err != nil {
+ return err
+ }
+
+ if resp.Status != healthpb.HealthCheckResponse_SERVING {
+ return errors.New("server not yet ready to serve")
+ }
+
+ return nil
+}
+
+// NewServer creates a Server for testing purposes
+func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer {
+ logger := NewTestLogger(tb)
+ logrusEntry := log.NewEntry(logger).WithField("test", tb.Name())
+
+ ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)
+ ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger)
+ ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger)
+
+ streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...)
+ unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...)
+
+ return NewTestServer(
+ grpc.NewServer(
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
+ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
+ ))
+}