Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2020-01-08 05:10:22 +0300
committerJohn Cai <jcai@gitlab.com>2020-01-29 04:36:52 +0300
commite11956afe6753f542be57876f3e5551e5c64d5b8 (patch)
tree78f71f73e51b6bce996d43c3a5a3f040c3d5e7ef
parent6582d73007e9022b7c844d018c80b0eafa9ca189 (diff)
Add test server to automatically start praefect
-rw-r--r--.gitlab-ci.yml5
-rw-r--r--Makefile8
-rw-r--r--internal/errors/errors.go10
-rw-r--r--internal/praefect/coordinator.go16
-rw-r--r--internal/praefect/datastore/datastore.go5
-rw-r--r--internal/praefect/protoregistry/find_oid.go2
-rw-r--r--internal/service/blob/get_blob_test.go12
-rw-r--r--internal/service/blob/get_blobs_test.go8
-rw-r--r--internal/service/blob/lfs_pointers_test.go24
-rw-r--r--internal/service/blob/testhelper_test.go22
-rw-r--r--internal/testhelper/testhelper.go138
11 files changed, 209 insertions, 41 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index f9dbc908a..d02ca67e1 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -137,6 +137,11 @@ test:proxy:
script:
- make test-with-proxies
+test:praefect:
+ <<: *test_definition
+ script:
+ - make test-with-praefect
+
race:
<<: *go_test_definition
script:
diff --git a/Makefile b/Makefile
index 10afe2289..7e220a8fd 100644
--- a/Makefile
+++ b/Makefile
@@ -60,6 +60,14 @@ prepare-tests: prepare-build
test: prepare-build
cd $(BUILD_DIR) && $(MAKE) $@
+.PHONY: test-with-praefect
+test-with-praefect: build-praefect
+ cd $(BUILD_DIR) && GITALY_TEST_PRAEFECT_BIN=1 PRAEFECT_BIN_PATH="$(BUILD_DIR)/bin/praefect" $(MAKE) test
+
+.PHONY: build-praefect
+build-praefect: prepare-build
+ cd cmd/praefect && go build -o "$(BUILD_DIR)/bin/praefect"
+
.PHONY: test-with-proxies
test-with-proxies: prepare-build
cd $(BUILD_DIR) && $(MAKE) $@
diff --git a/internal/errors/errors.go b/internal/errors/errors.go
new file mode 100644
index 000000000..d2efaef52
--- /dev/null
+++ b/internal/errors/errors.go
@@ -0,0 +1,10 @@
+package errors
+
+import "errors"
+
+var (
+ // ErrEmptyRepository is returned when an RPC is missing a repository as an argument
+ ErrEmptyRepository = errors.New("empty Repository")
+ // ErrInvalidRepository is returned when an RPC has an invalid repository as an argument
+ ErrInvalidRepository = errors.New("invalid Repository")
+)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index ef9bbdc28..2a4859a8e 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -12,6 +12,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
+ internalerrs "gitlab.com/gitlab-org/gitaly/internal/errors"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/conn"
"gitlab.com/gitlab-org/gitaly/internal/praefect/datastore"
@@ -19,6 +20,8 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
func isDestructive(methodName string) bool {
@@ -80,10 +83,14 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
var requestFinalizer func()
var storage string
+ var getRepoErr error
if mi.Scope == protoregistry.ScopeRepository {
- storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
- if err != nil {
- return nil, err
+ storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName)
+ if getRepoErr != nil {
+ if getRepoErr == protoregistry.ErrTargetRepoMissing {
+ return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error())
+ }
+ return nil, getRepoErr
}
} else {
storage, requestFinalizer, err = c.getAnyStorageNode()
@@ -185,6 +192,9 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git
newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName())
if err != nil {
+ if err == datastore.ErrNoPrimaryForStorage {
+ return nil, status.Error(codes.InvalidArgument, internalerrs.ErrInvalidRepository.Error())
+ }
return nil, fmt.Errorf("could not choose a primary: %v", err)
}
diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go
index 2830b197a..c8de9d82c 100644
--- a/internal/praefect/datastore/datastore.go
+++ b/internal/praefect/datastore/datastore.go
@@ -179,6 +179,9 @@ func NewInMemory(cfg config.Config) *MemoryDatastore {
return m
}
+// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it
+var ErrNoPrimaryForStorage = errors.New("no primary for storage")
+
// PickAPrimary returns the primary configured in the config file
func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, error) {
for _, node := range md.virtualStorages[virtualStorage] {
@@ -187,7 +190,7 @@ func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, err
}
}
- return models.Node{}, errors.New("no default primaries found")
+ return models.Node{}, ErrNoPrimaryForStorage
}
// GetReplicas gets the secondaries for a repository based on the relative path
diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go
index c17324a96..3e3db4485 100644
--- a/internal/praefect/protoregistry/find_oid.go
+++ b/internal/praefect/protoregistry/find_oid.go
@@ -17,7 +17,7 @@ const (
)
// ErrTargetRepoMissing indicates that the target repo is missing or not set
-var ErrTargetRepoMissing = errors.New("target repo is not set")
+var ErrTargetRepoMissing = errors.New("empty Repository")
func reflectFindRepoTarget(pbMsg proto.Message, targetOID []int) (*gitalypb.Repository, error) {
msgV, e := reflectFindOID(pbMsg, targetOID)
diff --git a/internal/service/blob/get_blob_test.go b/internal/service/blob/get_blob_test.go
index 728a1b247..ca24b07ad 100644
--- a/internal/service/blob/get_blob_test.go
+++ b/internal/service/blob/get_blob_test.go
@@ -13,8 +13,8 @@ import (
)
func TestSuccessfulGetBlob(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -90,8 +90,8 @@ func TestSuccessfulGetBlob(t *testing.T) {
}
func TestGetBlobNotFound(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -146,8 +146,8 @@ func getBlob(stream gitalypb.BlobService_GetBlobClient) (int64, string, []byte,
}
func TestFailedGetBlobRequestDueToValidationError(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
diff --git a/internal/service/blob/get_blobs_test.go b/internal/service/blob/get_blobs_test.go
index 8b7199731..8ecfcb696 100644
--- a/internal/service/blob/get_blobs_test.go
+++ b/internal/service/blob/get_blobs_test.go
@@ -13,8 +13,8 @@ import (
)
func TestSuccessfulGetBlobsRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -133,8 +133,8 @@ func TestSuccessfulGetBlobsRequest(t *testing.T) {
}
func TestFailedGetBlobsRequestDueToValidation(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go
index 019caa37c..7dd31eda2 100644
--- a/internal/service/blob/lfs_pointers_test.go
+++ b/internal/service/blob/lfs_pointers_test.go
@@ -13,8 +13,8 @@ import (
)
func TestSuccessfulGetLFSPointersRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -77,8 +77,8 @@ func TestSuccessfulGetLFSPointersRequest(t *testing.T) {
}
func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
@@ -125,8 +125,8 @@ func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) {
}
func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -291,8 +291,8 @@ func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) {
}
func TestFailedGetNewLFSPointersRequestDueToValidations(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -352,8 +352,8 @@ func drainNewPointers(c gitalypb.BlobService_GetNewLFSPointersClient) error {
}
func TestSuccessfulGetAllLFSPointersRequest(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
@@ -423,8 +423,8 @@ func getAllPointers(t *testing.T, c gitalypb.BlobService_GetAllLFSPointersClient
}
func TestFailedGetAllLFSPointersRequestDueToValidations(t *testing.T) {
- server, serverSocketPath := runBlobServer(t)
- defer server.Stop()
+ stop, serverSocketPath := runBlobServer(t)
+ defer stop()
client, conn := newBlobClient(t, serverSocketPath)
defer conn.Close()
diff --git a/internal/service/blob/testhelper_test.go b/internal/service/blob/testhelper_test.go
index c8d990995..d28ad8eb8 100644
--- a/internal/service/blob/testhelper_test.go
+++ b/internal/service/blob/testhelper_test.go
@@ -2,10 +2,10 @@ package blob
import (
"log"
- "net"
"os"
"testing"
+ "github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
@@ -23,6 +23,7 @@ func testMain(m *testing.M) int {
defer testhelper.MustHaveNoChildProcess()
testhelper.ConfigureRuby()
+
if err := rubyServer.Start(); err != nil {
log.Fatal(err)
}
@@ -31,22 +32,15 @@ func testMain(m *testing.M) int {
return m.Run()
}
-func runBlobServer(t *testing.T) (*grpc.Server, string) {
- grpcServer := testhelper.NewTestGrpcServer(t, nil, nil)
-
- serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
- listener, err := net.Listen("unix", serverSocketPath)
-
- if err != nil {
- t.Fatal(err)
- }
+func runBlobServer(t *testing.T) (func(), string) {
+ srv := testhelper.NewServer(t, nil, nil)
- gitalypb.RegisterBlobServiceServer(grpcServer, &server{ruby: rubyServer})
- reflection.Register(grpcServer)
+ gitalypb.RegisterBlobServiceServer(srv.GrpcServer(), &server{ruby: rubyServer})
+ reflection.Register(srv.GrpcServer())
- go grpcServer.Serve(listener)
+ require.NoError(t, srv.Start())
- return grpcServer, "unix://" + serverSocketPath
+ return srv.Stop, "unix://" + srv.Socket()
}
func newBlobClient(t *testing.T, serverSocketPath string) (gitalypb.BlobServiceClient, *grpc.ClientConn) {
diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go
index a052c5119..e9de2fd30 100644
--- a/internal/testhelper/testhelper.go
+++ b/internal/testhelper/testhelper.go
@@ -21,6 +21,7 @@ import (
"testing"
"time"
+ "github.com/BurntSushi/toml"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
@@ -32,10 +33,13 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors"
"gitlab.com/gitlab-org/gitaly/internal/helper/text"
gitalylog "gitlab.com/gitlab-org/gitaly/internal/log"
+ praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/storage"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@@ -313,6 +317,121 @@ func GetGitEnvData() (string, error) {
return string(gitEnvBytes), nil
}
+// NewTestServer instantiates a new TestServer
+func NewTestServer(srv *grpc.Server) *TestServer {
+ return &TestServer{
+ grpcServer: srv,
+ }
+}
+
+// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance
+// if necessary
+type TestServer struct {
+ grpcServer *grpc.Server
+ socket string
+ process *os.Process
+}
+
+// GrpcServer returns the underlying grpc.Server
+func (p *TestServer) GrpcServer() *grpc.Server {
+ return p.grpcServer
+}
+
+// Stop will stop both the grpc server as well as the praefect process
+func (p *TestServer) Stop() {
+ p.grpcServer.Stop()
+ if p.process != nil {
+ p.process.Kill()
+ }
+}
+
+// Socket returns the socket file the test server is listening on
+func (p *TestServer) Socket() string {
+ return p.socket
+}
+
+// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled
+func (p *TestServer) Start() error {
+ gitalyServerSocketPath := GetTemporaryGitalySocketFileName()
+
+ listener, err := net.Listen("unix", gitalyServerSocketPath)
+ if err != nil {
+ return err
+ }
+
+ go p.grpcServer.Serve(listener)
+
+ if os.Getenv("GITALY_TEST_PRAEFECT_BIN") == "1" {
+ tempDir, err := ioutil.TempDir("", "praefect-test-server")
+ if err != nil {
+ return err
+ }
+
+ praefectServerSocketPath := GetTemporaryGitalySocketFileName()
+
+ configFilePath := filepath.Join(tempDir, "config.toml")
+ configFile, err := os.Create(configFilePath)
+ if err != nil {
+ return err
+ }
+ defer configFile.Close()
+
+ c := praefectconfig.Config{
+ SocketPath: praefectServerSocketPath,
+ VirtualStorages: []*praefectconfig.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ {
+ Storage: "default",
+ Address: "unix:/" + gitalyServerSocketPath,
+ DefaultPrimary: true,
+ },
+ },
+ },
+ },
+ }
+
+ if err := toml.NewEncoder(configFile).Encode(&c); err != nil {
+ return err
+ }
+
+ cmd := exec.Command(os.Getenv("PRAEFECT_BIN_PATH"), "-config", configFilePath)
+ cmd.Stderr = os.Stderr
+
+ p.socket = praefectServerSocketPath
+
+ go cmd.Run()
+
+ conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure())
+
+ if err != nil {
+ return fmt.Errorf("dial: %v", err)
+ }
+ defer conn.Close()
+
+ client := healthpb.NewHealthClient(conn)
+ ctx, cancel := Context()
+ defer cancel()
+
+ for {
+ resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{})
+ if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING {
+ break
+ }
+ time.Sleep(1 * time.Microsecond)
+ }
+
+ os.Remove(tempDir)
+ p.process = cmd.Process
+
+ return nil
+ }
+
+ p.socket = gitalyServerSocketPath
+ return nil
+}
+
// NewTestGrpcServer creates a GRPC Server for testing purposes
func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *grpc.Server {
logger := NewTestLogger(tb)
@@ -331,6 +450,25 @@ func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInte
)
}
+// NewServer creates a Server for testing purposes
+func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer {
+ logger := NewTestLogger(tb)
+ logrusEntry := log.NewEntry(logger).WithField("test", tb.Name())
+
+ ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor)
+ ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger)
+ ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger)
+
+ streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...)
+ unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...)
+
+ return NewTestServer(
+ grpc.NewServer(
+ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)),
+ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)),
+ ))
+}
+
// MustHaveNoChildProcess panics if it finds a running or finished child
// process. It waits for 2 seconds for processes to be cleaned up by other
// goroutines.