diff options
author | John Cai <jcai@gitlab.com> | 2020-01-08 05:10:22 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2020-01-29 04:36:52 +0300 |
commit | e11956afe6753f542be57876f3e5551e5c64d5b8 (patch) | |
tree | 78f71f73e51b6bce996d43c3a5a3f040c3d5e7ef | |
parent | 6582d73007e9022b7c844d018c80b0eafa9ca189 (diff) |
Add test server to automatically start praefect
-rw-r--r-- | .gitlab-ci.yml | 5 | ||||
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | internal/errors/errors.go | 10 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 16 | ||||
-rw-r--r-- | internal/praefect/datastore/datastore.go | 5 | ||||
-rw-r--r-- | internal/praefect/protoregistry/find_oid.go | 2 | ||||
-rw-r--r-- | internal/service/blob/get_blob_test.go | 12 | ||||
-rw-r--r-- | internal/service/blob/get_blobs_test.go | 8 | ||||
-rw-r--r-- | internal/service/blob/lfs_pointers_test.go | 24 | ||||
-rw-r--r-- | internal/service/blob/testhelper_test.go | 22 | ||||
-rw-r--r-- | internal/testhelper/testhelper.go | 138 |
11 files changed, 209 insertions, 41 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f9dbc908a..d02ca67e1 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -137,6 +137,11 @@ test:proxy: script: - make test-with-proxies +test:praefect: + <<: *test_definition + script: + - make test-with-praefect + race: <<: *go_test_definition script: @@ -60,6 +60,14 @@ prepare-tests: prepare-build test: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ +.PHONY: test-with-praefect +test-with-praefect: build-praefect + cd $(BUILD_DIR) && GITALY_TEST_PRAEFECT_BIN=1 PRAEFECT_BIN_PATH="$(BUILD_DIR)/bin/praefect" $(MAKE) test + +.PHONY: build-praefect +build-praefect: prepare-build + cd cmd/praefect && go build -o "$(BUILD_DIR)/bin/praefect" + .PHONY: test-with-proxies test-with-proxies: prepare-build cd $(BUILD_DIR) && $(MAKE) $@ diff --git a/internal/errors/errors.go b/internal/errors/errors.go new file mode 100644 index 000000000..d2efaef52 --- /dev/null +++ b/internal/errors/errors.go @@ -0,0 +1,10 @@ +package errors + +import "errors" + +var ( + // ErrEmptyRepository is returned when an RPC is missing a repository as an argument + ErrEmptyRepository = errors.New("empty Repository") + // ErrInvalidRepository is returned when an RPC has an invalid repository as an argument + ErrInvalidRepository = errors.New("invalid Repository") +) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index ef9bbdc28..2a4859a8e 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -12,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" + internalerrs "gitlab.com/gitlab-org/gitaly/internal/errors" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/conn" "gitlab.com/gitlab-org/gitaly/internal/praefect/datastore" @@ -19,6 +20,8 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func isDestructive(methodName string) bool { @@ -80,10 +83,14 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, var requestFinalizer func() var storage string + var getRepoErr error if mi.Scope == protoregistry.ScopeRepository { - storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) - if err != nil { - return nil, err + storage, requestFinalizer, getRepoErr = c.getStorageForRepositoryMessage(mi, m, peeker, fullMethodName) + if getRepoErr != nil { + if getRepoErr == protoregistry.ErrTargetRepoMissing { + return nil, status.Errorf(codes.InvalidArgument, getRepoErr.Error()) + } + return nil, getRepoErr } } else { storage, requestFinalizer, err = c.getAnyStorageNode() @@ -185,6 +192,9 @@ func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *git newPrimary, err := c.datastore.PickAPrimary(targetRepo.GetStorageName()) if err != nil { + if err == datastore.ErrNoPrimaryForStorage { + return nil, status.Error(codes.InvalidArgument, internalerrs.ErrInvalidRepository.Error()) + } return nil, fmt.Errorf("could not choose a primary: %v", err) } diff --git a/internal/praefect/datastore/datastore.go b/internal/praefect/datastore/datastore.go index 2830b197a..c8de9d82c 100644 --- a/internal/praefect/datastore/datastore.go +++ b/internal/praefect/datastore/datastore.go @@ -179,6 +179,9 @@ func NewInMemory(cfg config.Config) *MemoryDatastore { return m } +// ErrNoPrimaryForStorage indicates a virtual storage has no primary associated with it +var ErrNoPrimaryForStorage = errors.New("no primary for storage") + // PickAPrimary returns the primary configured in the config file func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, error) { for _, node := range md.virtualStorages[virtualStorage] { @@ -187,7 +190,7 @@ func (md *MemoryDatastore) PickAPrimary(virtualStorage string) (models.Node, err } } - return models.Node{}, errors.New("no default primaries found") + return models.Node{}, ErrNoPrimaryForStorage } // GetReplicas gets the secondaries for a repository based on the relative path diff --git a/internal/praefect/protoregistry/find_oid.go b/internal/praefect/protoregistry/find_oid.go index c17324a96..3e3db4485 100644 --- a/internal/praefect/protoregistry/find_oid.go +++ b/internal/praefect/protoregistry/find_oid.go @@ -17,7 +17,7 @@ const ( ) // ErrTargetRepoMissing indicates that the target repo is missing or not set -var ErrTargetRepoMissing = errors.New("target repo is not set") +var ErrTargetRepoMissing = errors.New("empty Repository") func reflectFindRepoTarget(pbMsg proto.Message, targetOID []int) (*gitalypb.Repository, error) { msgV, e := reflectFindOID(pbMsg, targetOID) diff --git a/internal/service/blob/get_blob_test.go b/internal/service/blob/get_blob_test.go index 728a1b247..ca24b07ad 100644 --- a/internal/service/blob/get_blob_test.go +++ b/internal/service/blob/get_blob_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetBlob(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -90,8 +90,8 @@ func TestSuccessfulGetBlob(t *testing.T) { } func TestGetBlobNotFound(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -146,8 +146,8 @@ func getBlob(stream gitalypb.BlobService_GetBlobClient) (int64, string, []byte, } func TestFailedGetBlobRequestDueToValidationError(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() diff --git a/internal/service/blob/get_blobs_test.go b/internal/service/blob/get_blobs_test.go index 8b7199731..8ecfcb696 100644 --- a/internal/service/blob/get_blobs_test.go +++ b/internal/service/blob/get_blobs_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetBlobsRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -133,8 +133,8 @@ func TestSuccessfulGetBlobsRequest(t *testing.T) { } func TestFailedGetBlobsRequestDueToValidation(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() diff --git a/internal/service/blob/lfs_pointers_test.go b/internal/service/blob/lfs_pointers_test.go index 019caa37c..7dd31eda2 100644 --- a/internal/service/blob/lfs_pointers_test.go +++ b/internal/service/blob/lfs_pointers_test.go @@ -13,8 +13,8 @@ import ( ) func TestSuccessfulGetLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -77,8 +77,8 @@ func TestSuccessfulGetLFSPointersRequest(t *testing.T) { } func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() testRepo, _, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() @@ -125,8 +125,8 @@ func TestFailedGetLFSPointersRequestDueToValidations(t *testing.T) { } func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -291,8 +291,8 @@ func TestSuccessfulGetNewLFSPointersRequest(t *testing.T) { } func TestFailedGetNewLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -352,8 +352,8 @@ func drainNewPointers(c gitalypb.BlobService_GetNewLFSPointersClient) error { } func TestSuccessfulGetAllLFSPointersRequest(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() @@ -423,8 +423,8 @@ func getAllPointers(t *testing.T, c gitalypb.BlobService_GetAllLFSPointersClient } func TestFailedGetAllLFSPointersRequestDueToValidations(t *testing.T) { - server, serverSocketPath := runBlobServer(t) - defer server.Stop() + stop, serverSocketPath := runBlobServer(t) + defer stop() client, conn := newBlobClient(t, serverSocketPath) defer conn.Close() diff --git a/internal/service/blob/testhelper_test.go b/internal/service/blob/testhelper_test.go index c8d990995..d28ad8eb8 100644 --- a/internal/service/blob/testhelper_test.go +++ b/internal/service/blob/testhelper_test.go @@ -2,10 +2,10 @@ package blob import ( "log" - "net" "os" "testing" + "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" @@ -23,6 +23,7 @@ func testMain(m *testing.M) int { defer testhelper.MustHaveNoChildProcess() testhelper.ConfigureRuby() + if err := rubyServer.Start(); err != nil { log.Fatal(err) } @@ -31,22 +32,15 @@ func testMain(m *testing.M) int { return m.Run() } -func runBlobServer(t *testing.T) (*grpc.Server, string) { - grpcServer := testhelper.NewTestGrpcServer(t, nil, nil) - - serverSocketPath := testhelper.GetTemporaryGitalySocketFileName() - listener, err := net.Listen("unix", serverSocketPath) - - if err != nil { - t.Fatal(err) - } +func runBlobServer(t *testing.T) (func(), string) { + srv := testhelper.NewServer(t, nil, nil) - gitalypb.RegisterBlobServiceServer(grpcServer, &server{ruby: rubyServer}) - reflection.Register(grpcServer) + gitalypb.RegisterBlobServiceServer(srv.GrpcServer(), &server{ruby: rubyServer}) + reflection.Register(srv.GrpcServer()) - go grpcServer.Serve(listener) + require.NoError(t, srv.Start()) - return grpcServer, "unix://" + serverSocketPath + return srv.Stop, "unix://" + srv.Socket() } func newBlobClient(t *testing.T, serverSocketPath string) (gitalypb.BlobServiceClient, *grpc.ClientConn) { diff --git a/internal/testhelper/testhelper.go b/internal/testhelper/testhelper.go index a052c5119..e9de2fd30 100644 --- a/internal/testhelper/testhelper.go +++ b/internal/testhelper/testhelper.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/BurntSushi/toml" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -32,10 +33,13 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper/fieldextractors" "gitlab.com/gitlab-org/gitaly/internal/helper/text" gitalylog "gitlab.com/gitlab-org/gitaly/internal/log" + praefectconfig "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/storage" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -313,6 +317,121 @@ func GetGitEnvData() (string, error) { return string(gitEnvBytes), nil } +// NewTestServer instantiates a new TestServer +func NewTestServer(srv *grpc.Server) *TestServer { + return &TestServer{ + grpcServer: srv, + } +} + +// TestServer wraps a grpc Server and handles automatically putting a praefect in front of a gitaly instance +// if necessary +type TestServer struct { + grpcServer *grpc.Server + socket string + process *os.Process +} + +// GrpcServer returns the underlying grpc.Server +func (p *TestServer) GrpcServer() *grpc.Server { + return p.grpcServer +} + +// Stop will stop both the grpc server as well as the praefect process +func (p *TestServer) Stop() { + p.grpcServer.Stop() + if p.process != nil { + p.process.Kill() + } +} + +// Socket returns the socket file the test server is listening on +func (p *TestServer) Socket() string { + return p.socket +} + +// Start will start the grpc server as well as spawn a praefect instance if GITALY_TEST_PRAEFECT_BIN is enabled +func (p *TestServer) Start() error { + gitalyServerSocketPath := GetTemporaryGitalySocketFileName() + + listener, err := net.Listen("unix", gitalyServerSocketPath) + if err != nil { + return err + } + + go p.grpcServer.Serve(listener) + + if os.Getenv("GITALY_TEST_PRAEFECT_BIN") == "1" { + tempDir, err := ioutil.TempDir("", "praefect-test-server") + if err != nil { + return err + } + + praefectServerSocketPath := GetTemporaryGitalySocketFileName() + + configFilePath := filepath.Join(tempDir, "config.toml") + configFile, err := os.Create(configFilePath) + if err != nil { + return err + } + defer configFile.Close() + + c := praefectconfig.Config{ + SocketPath: praefectServerSocketPath, + VirtualStorages: []*praefectconfig.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "default", + Address: "unix:/" + gitalyServerSocketPath, + DefaultPrimary: true, + }, + }, + }, + }, + } + + if err := toml.NewEncoder(configFile).Encode(&c); err != nil { + return err + } + + cmd := exec.Command(os.Getenv("PRAEFECT_BIN_PATH"), "-config", configFilePath) + cmd.Stderr = os.Stderr + + p.socket = praefectServerSocketPath + + go cmd.Run() + + conn, err := grpc.Dial("unix://"+praefectServerSocketPath, grpc.WithInsecure()) + + if err != nil { + return fmt.Errorf("dial: %v", err) + } + defer conn.Close() + + client := healthpb.NewHealthClient(conn) + ctx, cancel := Context() + defer cancel() + + for { + resp, err := client.Check(ctx, &healthpb.HealthCheckRequest{}) + if err == nil && resp.Status == healthpb.HealthCheckResponse_SERVING { + break + } + time.Sleep(1 * time.Microsecond) + } + + os.Remove(tempDir) + p.process = cmd.Process + + return nil + } + + p.socket = gitalyServerSocketPath + return nil +} + // NewTestGrpcServer creates a GRPC Server for testing purposes func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *grpc.Server { logger := NewTestLogger(tb) @@ -331,6 +450,25 @@ func NewTestGrpcServer(tb testing.TB, streamInterceptors []grpc.StreamServerInte ) } +// NewServer creates a Server for testing purposes +func NewServer(tb testing.TB, streamInterceptors []grpc.StreamServerInterceptor, unaryInterceptors []grpc.UnaryServerInterceptor) *TestServer { + logger := NewTestLogger(tb) + logrusEntry := log.NewEntry(logger).WithField("test", tb.Name()) + + ctxTagger := grpc_ctxtags.WithFieldExtractorForInitialReq(fieldextractors.FieldExtractor) + ctxStreamTagger := grpc_ctxtags.StreamServerInterceptor(ctxTagger) + ctxUnaryTagger := grpc_ctxtags.UnaryServerInterceptor(ctxTagger) + + streamInterceptors = append([]grpc.StreamServerInterceptor{ctxStreamTagger, grpc_logrus.StreamServerInterceptor(logrusEntry)}, streamInterceptors...) + unaryInterceptors = append([]grpc.UnaryServerInterceptor{ctxUnaryTagger, grpc_logrus.UnaryServerInterceptor(logrusEntry)}, unaryInterceptors...) + + return NewTestServer( + grpc.NewServer( + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamInterceptors...)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryInterceptors...)), + )) +} + // MustHaveNoChildProcess panics if it finds a running or finished child // process. It waits for 2 seconds for processes to be cleaned up by other // goroutines. |