Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-01-29 05:24:44 +0300
committerJohn Cai <jcai@gitlab.com>2020-01-29 05:24:44 +0300
commitc6099504b3355f12ea618451b7912b5b80a36593 (patch)
tree98aa5b58c71e04d4352cf4bd62a57808d1644aaa
parent6582d73007e9022b7c844d018c80b0eafa9ca189 (diff)
parent80f37b6b0da168f53086d05a1e869081d2d8d11a (diff)
Merge branch 'jc-praefect-test' into 'master'
Add praefect as a transparent pass through for tests Closes #2122 See merge request gitlab-org/gitaly!1736
-rw-r--r--.gitlab-ci.yml5
-rw-r--r--Makefile4
-rw-r--r--_support/Makefile.template6
-rw-r--r--changelogs/unreleased/jc-praefect-test.yml5
-rw-r--r--internal/errors/errors.go10
-rw-r--r--internal/praefect/coordinator.go22
-rw-r--r--internal/praefect/datastore/datastore.go5
-rw-r--r--internal/praefect/protoregistry/find_oid.go4
-rw-r--r--internal/service/blob/get_blob_test.go12
-rw-r--r--internal/service/blob/get_blobs_test.go8
-rw-r--r--internal/service/blob/lfs_pointers.go3
-rw-r--r--internal/service/blob/lfs_pointers_test.go28
-rw-r--r--internal/service/blob/testhelper_test.go22
-rw-r--r--internal/testhelper/testserver.go178
14 files changed, 267 insertions, 45 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index f9dbc908a..d02ca67e1 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -137,6 +137,11 @@ test:proxy:
script:
- make test-with-proxies
+test:praefect:
+ <<: *test_definition
+ script:
+ - make test-with-praefect
+
race:
<<: *go_test_definition
script:
diff --git a/Makefile b/Makefile
index 10afe2289..4a49729c2 100644
--- a/Makefile
+++ b/Makefile
@@ -60,6 +60,10 @@ prepare-tests: prepare-build
test: prepare-build
cd $(BUILD_DIR) && $(MAKE) $@
+.PHONY: test-with-praefect
+test-with-praefect: prepare-build
+ cd $(BUILD_DIR) && $(MAKE) $@
+
.PHONY: test-with-proxies
test-with-proxies: prepare-build
cd $(BUILD_DIR) && $(MAKE) $@
diff --git a/_support/Makefile.template b/_support/Makefile.template
index 118100d6f..80a3b0457 100644
--- a/_support/Makefile.template
+++ b/_support/Makefile.template
@@ -130,6 +130,12 @@ test-with-proxies: prepare-tests
@cd {{ .SourceDir }} &&\
go test -tags "$(BUILD_TAGS)" -count=1 -exec {{ .SourceDir }}/_support/bad-proxies {{ .Pkg }}/internal/rubyserver/
+
+.PHONY: test-with-praefect
+test-with-praefect: build prepare-tests
+ @cd {{ .SourceDir }} &&\
+ GITALY_TEST_PRAEFECT_BIN={{ .BuildDir }}/bin/praefect go test -tags "$(BUILD_TAGS)" -count=1 {{ join .AllPackages " " }} # count=1 bypasses go 1.10 test caching
+
.PHONY: race-go
race-go: prepare-tests
@cd {{ .SourceDir }} && go test -tags "$(BUILD_TAGS)" -race {{ join .AllPackages " " }}
diff --git a/changelogs/unreleased/jc-praefect-test.yml b/changelogs/unreleased/jc-praefect-test.yml
new file mode 100644
index 000000000..94149b755
--- /dev/null
+++ b/changelogs/unreleased/jc-praefect-test.yml
@@ -0,0 +1,5 @@
+---
+title: Add praefect as a transparent pass through for tests
+merge_request: 1736
+author:
+type: other
diff --git a/internal/errors/errors.go b/internal/errors/errors.go
new file mode 100644
index 000000000..d2efaef52
--- /dev/null
+++ b/internal/errors/errors.go
@@ -0,0 +1,10 @@
+package errors
+
+import "errors"
+
+var (
+ // ErrEmptyRepository is returned when an RPC is missing a repository as an argument
+ ErrEmptyRepository = errors.New("empty Repository")
+ // ErrInvalidRepository is returned when an RPC has an invalid repository as an argument
+ ErrInvalidRepository = errors.New("invalid Repository")
+)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index ef9bbdc28..a476b61c9 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -12,6 +12,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
+ internalerrs "gitlab.com/gitlab-org/gitaly/internal/errors"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -19,6 +20,8 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
func isDestructive(methodName string) bool {
@@ -81,9 +84,19 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
var storage string
if mi.Scope == protoregistry.ScopeRepository {
- storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
- if err != nil {
- return nil, err
+ var getRepoErr error
+ storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
+
+ if getRepoErr == protoregistry.ErrTargetRepoMissing {
+ return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error())
+ }
+
+ if getRepoErr != nil {
+ return nil, getRepoErr
+ }
+
+ if storage == "" {
+ return nil, status.Error(codes.InvalidArgument, "storage not found")
}
} else {
storage, requestFinalizer, err = c.getAnyStorageNode()
@@ -185,6 +198,9 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git
newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName())
if err != nil {
+ if err == datastore.ErrNoPrimaryForStorage {
+ return nil, status.Error(codes.InvalidArgument, internalerrs.ErrInvalidRepository.Error())
+ }
return nil, fmt.Errorf("could not choose a primary: %v", err)
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 2830b197a..c8de9d82c 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -179,6 +179,9 @@ func NewInMemory(cfg config.Config) *MemoryDatastore {
return m
}
+// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it
+var ErrNoPrimaryForStorage = errors.New("no primary for storage")
+
// PickAPrimary returns the primary configured in the config file
func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, error) {
for _, node := range md.virtualStorages[virtualStorage] {
@@ -187,7 +190,7 @@ func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, err
}
}
- return models.Node{}, errors.New("no default primaries found")
+ return models.Node{}, ErrNoPrimaryForStorage
}
// GetReplicas gets the secondaries for a repository based on the relative path
diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go
index c17324a96..8b73d4943 100644
--- a/internal/praefect/protoregistry/find_oid.go
+++ b/internal/praefect/protoregistry/find_oid.go
@@ -17,7 +17,7 @@ const (
)
// ErrTargetRepoMissing indicates that the target repo is missing or not set
-var ErrTargetRepoMissing = errors.New("target repo is not set")
+var ErrTargetRepoMissing = errors.New("empty Repository")
func reflectFindRepoTarget(pbMsg proto.Message, targetOID []int) (*gitalypb.Repository, error) {
msgV, e := reflectFindOID(pbMsg, targetOID)
@@ -58,7 +58,7 @@ func reflectFindOID(pbMsg proto.Message, targetOID []int) (reflect.Value, error)
msgV, err = findProtoField(msgV, fieldNo)
if err != nil {
return reflect.Value{}, fmt.Errorf(
- "unable to descend OID %+v into message %s: %s",
+ "unable to descend OID %+v into message %s: %v",
targetOID, proto.MessageName(pbMsg), err,
)
}
diff --git a/internal/service/blob/get_blob_test.go b/internal/service/blob/get_blob_test.go
index 728a1b247..ca24b07ad 100644
--- a/internal/service/blob/get_blob_test.go
+++ b/internal/service/blob/get_blob_test.go
@@ -13,8 +13,8 @@ import (
)
func TestSuccessfulGetBlob(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -90,8 +90,8 @@ func TestSuccessfulGetBlob(t *testing.T) {
}
func TestGetBlobNotFound(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -146,8 +146,8 @@ func getBlob(stream gitalypb.BlobService_GetBlobClient) (int64, string, []byte,
}
func TestFailedGetBlobRequestDueToValidationError(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
diff --git a/internal/service/blob/get_blobs_test.go b/internal/service/blob/get_blobs_test.go
index 8b7199731..8ecfcb696 100644
--- a/internal/service/blob/get_blobs_test.go
+++ b/internal/service/blob/get_blobs_test.go
@@ -13,8 +13,8 @@ import (
)
func TestSuccessfulGetBlobsRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -133,8 +133,8 @@ func TestSuccessfulGetBlobsRequest(t *testing.T) {
}
func TestFailedGetBlobsRequestDueToValidation(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
diff --git a/internal/service/blob/lfs_pointers.go b/internal/service/blob/lfs_pointers.go
index 9fffdb7a3..e4b61490f 100644
--- a/internal/service/blob/lfs_pointers.go
+++ b/internal/service/blob/lfs_pointers.go
@@ -3,6 +3,7 @@ package blob
import (
"fmt"
+ "gitlab.com/gitlab-org/gitaly/internal/errors"
"gitlab.com/gitlab-org/gitaly/internal/git"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
@@ -62,7 +63,7 @@ func (s *server) GetLFSPointers(req *gitalypb.GetLFSPointersRequest, stream gita
func validateGetLFSPointersRequest(req *gitalypb.GetLFSPointersRequest) error {
if req.GetRepository() == nil {
- return fmt.Errorf("empty Repository")
+ return errors.ErrEmptyRepository
}
if len(req.GetBlobIds()) == 0 {
diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go
index 019caa37c..7126b35f8 100644
--- a/internal/service/blob/lfs_pointers_test.go
+++ b/internal/service/blob/lfs_pointers_test.go
@@ -13,8 +13,8 @@ import (
)
func TestSuccessfulGetLFSPointersRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -77,8 +77,8 @@ func TestSuccessfulGetLFSPointersRequest(t *testing.T) {
}
func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -125,8 +125,8 @@ func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) {
}
func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -291,8 +291,8 @@ func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) {
}
func TestFailedGetNewLFSPointersRequestDueToValidations(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -352,8 +352,8 @@ func drainNewPointers(c gitalypb.BlobService_GetNewLFSPointersClient) error {
}
func TestSuccessfulGetAllLFSPointersRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -423,8 +423,8 @@ func getAllPointers(t *testing.T, c gitalypb.BlobService_GetAllLFSPointersClient
}
func TestFailedGetAllLFSPointersRequestDueToValidations(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -470,8 +470,8 @@ func drainAllPointers(c gitalypb.BlobService_GetAllLFSPointersClient) error {
// TestGetAllLFSPointersVerifyScope verifies that this RPC returns all LFS
// pointers in a repository, not only ones reachable from the default branch
func TestGetAllLFSPointersVerifyScope(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
diff --git a/internal/service/blob/testhelper_test.go b/internal/service/blob/testhelper_test.go
index c8d990995..d28ad8eb8 100644
--- a/internal/service/blob/testhelper_test.go
+++ b/internal/service/blob/testhelper_test.go
@@ -2,10 +2,10 @@ package blob
import (
"log"
- "net"
"os"
"testing"
+ "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -23,6 +23,7 @@ func testMain(m *testing.M) int {
defer testhelper.MustHaveNoChildProcess()
testhelper.ConfigureRuby()
+
if err := rubyServer.Start(); err != nil {
log.Fatal(err)
}
@@ -31,22 +32,15 @@ func testMain(m *testing.M) int {
return m.Run()
}
-func runBlobServer(t *testing.T) (*grpc.Server, string) {
- grpcServer := testhelper.NewTestGrpcServer(t, nil, nil)
-
- serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
- listener, err := net.Listen("unix", serverSocketPath)
-
- if err != nil {
- t.Fatal(err)
- }
+func runBlobServer(t *testing.T) (func(), string) {
+ srv := testhelper.NewServer(t, nil, nil)
- gitalypb.RegisterBlobServiceServer(grpcServer, &server{ruby: rubyServer})
- reflection.Register(grpcServer)
+ gitalypb.RegisterBlobServiceServer(srv.GrpcServer(), &server{ruby: rubyServer})
+ reflection.Register(srv.GrpcServer())
- go grpcServer.Serve(listener)
+ require.NoError(t, srv.Start())
- return grpcServer, "unix://" + serverSocketPath
+ return srv.Stop, "unix://" + srv.Socket()
}
func newBlobClient(t *testing.T, serverSocketPath string) (gitalypb.BlobServiceClient, *grpc.ClientConn) {
diff --git a/internal/testhelper/testserver.go b/internal/testhelper/testserver.go
new file mode 100644
index 000000000..4d69fbfc2
--- /dev/null
+++ b/internal/testhelper/testserver.go
@@ -0,0 +1,178 @@
+package testhelper
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "testing"
+ "time"
+
+ "github.com/BurntSushi/toml"
+ grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
+ grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+ grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
+ log "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
+ praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "google.golang.org/grpc"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+// NewTestServer instantiates a new TestServer
+func NewTestServer(srv *grpc.Server) *TestServer {
+ return &TestServer{
+ grpcServer: srv,
+ }
+}
+
+// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance
+// if necessary
+type TestServer struct {
+ grpcServer *grpc.Server
+ socket string
+ process *os.Process
+}
+
+// GrpcServer returns the underlying grpc.Server
+func (p *TestServer) GrpcServer() *grpc.Server {
+ return p.grpcServer
+}
+
+// Stop will stop both the grpc server as well as the praefect process
+func (p *TestServer) Stop() {
+ p.grpcServer.Stop()
+ if p.process != nil {
+ p.process.Kill()
+ }
+}
+
+// Socket returns the socket file the test server is listening on
+func (p *TestServer) Socket() string {
+ return p.socket
+}
+
+// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled
+func (p *TestServer) Start() error {
+ gitalyServerSocketPath := GetTemporaryGitalySocketFileName()
+
+ listener, err := net.Listen("unix", gitalyServerSocketPath)
+ if err != nil {
+ return err
+ }
+
+ go p.grpcServer.Serve(listener)
+
+ praefectBinPath, ok := os.LookupEnv("GITALY_TEST_PRAEFECT_BIN")
+ if !ok {
+ p.socket = gitalyServerSocketPath
+ return nil
+ }
+
+ tempDir, err := ioutil.TempDir("", "praefect-test-server")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(tempDir)
+
+ praefectServerSocketPath := GetTemporaryGitalySocketFileName()
+
+ configFilePath := filepath.Join(tempDir, "config.toml")
+ configFile, err := os.Create(configFilePath)
+ if err != nil {
+ return err
+ }
+ defer configFile.Close()
+
+ c := praefectconfig.Config{
+ SocketPath: praefectServerSocketPath,
+ VirtualStorages: []*praefectconfig.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ {
+ Storage: "default",
+ Address: "unix:/" + gitalyServerSocketPath,
+ DefaultPrimary: true,
+ },
+ },
+ },
+ },
+ }
+
+ if err := toml.NewEncoder(configFile).Encode(&c); err != nil {
+ return err
+ }
+ if err = configFile.Sync(); err != nil {
+ return err
+ }
+ configFile.Close()
+
+ cmd := exec.Command(praefectBinPath, "-config", configFilePath)
+ cmd.Stderr = os.Stderr
+ cmd.Stdout = os.Stdout
+
+ p.socket = praefectServerSocketPath
+
+ if err := cmd.Start(); err != nil {
+ return err
+ }
+ go cmd.Wait()
+
+ conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure())
+
+ if err != nil {
+ return fmt.Errorf("dial: %v", err)
+ }
+ defer conn.Close()
+
+ if err = waitForPraefectStartup(conn); err != nil {
+ return err
+ }
+
+ p.process = cmd.Process
+
+ return nil
+}
+
+func waitForPraefectStartup(conn *grpc.ClientConn) error {
+ client := healthpb.NewHealthClient(conn)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}, grpc.WaitForReady(true))
+ if err != nil {
+ return err
+ }
+
+ if resp.Status != healthpb.HealthCheckResponse_SERVING {
+ return errors.New("server not yet ready to serve")
+ }
+
+ return nil
+}
+
+// NewServer creates a Server for testing purposes
+func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer {
+ logger := NewTestLogger(tb)
+ logrusEntry := log.NewEntry(logger).WithField("test", tb.Name())
+
+ ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)
+ ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger)
+ ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger)
+
+ streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...)
+ unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...)
+
+ return NewTestServer(
+ grpc.NewServer(
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
+ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
+ ))
+}