Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJustin Tobler <jtobler@gitlab.com>2023-12-01 01:01:11 +0300
committerJustin Tobler <jtobler@gitlab.com>2023-12-04 23:56:57 +0300
commitec32915c508b8c98765640867175916cd2b77215 (patch)
tree63bada9102b9f285553c14ce29baf29d7e6f420b
parent41a67fb9185c6a842decc22bb917d65f2ca1d6ff (diff)
analysis: Implement `CheckBlobsGenerated` RPC
To accommodate generated file detection for a set of specified files in a repository, the `CheckBlobsGenerated` RPC has been implemented. Generated file detection is performed though `go-enry` using heuristics on the file path and the blob contents. The RPC supports multiple file checks via streaming multiple requests containing batches of one or more files. File check results are streamed back to the client in chunks with at least one response message per received request.
-rw-r--r--internal/gitaly/service/analysis/check_generated.go177
-rw-r--r--internal/gitaly/service/analysis/check_generated_test.go374
-rw-r--r--internal/gitaly/service/analysis/server.go33
-rw-r--r--internal/gitaly/service/analysis/testhelper_test.go11
-rw-r--r--internal/gitaly/service/setup/register.go2
5 files changed, 597 insertions, 0 deletions
diff --git a/internal/gitaly/service/analysis/check_generated.go b/internal/gitaly/service/analysis/check_generated.go
new file mode 100644
index 000000000..708efddca
--- /dev/null
+++ b/internal/gitaly/service/analysis/check_generated.go
@@ -0,0 +1,177 @@
+package analysis
+
+import (
+ "errors"
+ "fmt"
+ "io"
+
+ "github.com/go-enry/go-enry/v2"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/protobuf/proto"
+)
+
+func (s *server) CheckBlobsGenerated(stream gitalypb.AnalysisService_CheckBlobsGeneratedServer) (returnedErr error) {
+ req, err := stream.Recv()
+ if err != nil {
+ return fmt.Errorf("receiving first request: %w", err)
+ }
+
+ repository := req.GetRepository()
+ if err := s.locator.ValidateRepository(repository); err != nil {
+ return structerr.NewInvalidArgument("%w", err)
+ }
+
+ ctx := stream.Context()
+ repo := s.localrepo(repository)
+
+ reader, readerCancel, err := s.catfileCache.ObjectReader(ctx, repo)
+ if err != nil {
+ return fmt.Errorf("retrieving object reader: %w", err)
+ }
+ defer readerCancel()
+
+ queue, queueCancel, err := reader.ObjectQueue(ctx)
+ if err != nil {
+ return fmt.Errorf("retrieving object queue: %w", err)
+ }
+ defer queueCancel()
+
+ group, groupCtx := errgroup.WithContext(ctx)
+ requestsChan := make(chan *gitalypb.CheckBlobsGeneratedRequest)
+
+ // The output of git-cat-file(1) is processed in a separate goroutine to allow requests to
+ // continuously queue additional objects for processing.
+ group.Go(func() error {
+ chunkSender := chunk.New(&checkBlobsGeneratedSender{stream: stream})
+
+ for req := range requestsChan {
+ for _, blob := range req.GetBlobs() {
+ object, err := queue.ReadObject(ctx)
+ if err != nil {
+ return fmt.Errorf("reading object: %w", err)
+ }
+
+ // The requested Git revisions must always resolve to a blob. Otherwise, there is
+ // not a file to perform the generation check on.
+ if !object.IsBlob() {
+ return structerr.NewInvalidArgument("object is not a blob")
+ }
+
+ // Read an arbitrary number of bytes that is considered enough to determine whether
+ // the file is generated.
+ content, err := io.ReadAll(io.LimitReader(object, 2048))
+ if err != nil {
+ return fmt.Errorf("reading blob content: %w", err)
+ }
+
+ // Any remaining blob data must be consumed before reading the next object. This is
+ // quite inefficient, but there is currently no alternative because git-cat-file(1)
+ // cannot be asked to limit the number of bytes it's outputting.
+ if _, err := io.Copy(io.Discard, object); err != nil {
+ return fmt.Errorf("discarding remaining blob content: %w", err)
+ }
+
+ if err := chunkSender.Send(&gitalypb.CheckBlobsGeneratedResponse_Blob{
+ Revision: blob.Revision,
+ Generated: enry.IsGenerated(string(blob.Path), content),
+ }); err != nil {
+ return fmt.Errorf("sending response: %w", err)
+ }
+ }
+
+ // The sender is flushed for each received request message so that at least one response
+ // message is always produced.
+ if err := chunkSender.Flush(); err != nil {
+ return fmt.Errorf("flushing response: %w", err)
+ }
+ }
+
+ return nil
+ })
+
+ // Ensure that the sending goroutine always closes and any lingering requests are first
+ // processed before the surrounding function returns.
+ defer func() {
+ close(requestsChan)
+ if err := group.Wait(); err != nil && returnedErr == nil {
+ returnedErr = err
+ }
+ }()
+
+ for {
+ if err := validateCheckBlobsGeneratedRequest(req); err != nil {
+ return structerr.NewInvalidArgument("validating request: %w", err)
+ }
+
+ // Queue up all revisions specified in the request for processing through git-cat-file(1).
+ for _, blob := range req.GetBlobs() {
+ if err := queue.RequestObject(ctx, git.Revision(blob.Revision)); err != nil {
+ return fmt.Errorf("requesting object: %w", err)
+ }
+ }
+
+ if err := queue.Flush(ctx); err != nil {
+ return fmt.Errorf("flushing queue: %w", err)
+ }
+
+ select {
+ // When performing the file generation check the file path is used to gain additional
+ // insight. Send the request to the processing goroutine to provide file paths and context
+ // for how to batch response messages.
+ case requestsChan <- req:
+ // The group context is cancelled when the sending goroutine exits with an error.
+ case <-groupCtx.Done():
+ return nil
+ }
+
+ req, err = stream.Recv()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+
+ return fmt.Errorf("receiving next request: %w", err)
+ }
+ }
+
+ return nil
+}
+
+func validateCheckBlobsGeneratedRequest(req *gitalypb.CheckBlobsGeneratedRequest) error {
+ if len(req.Blobs) == 0 {
+ return errors.New("empty blobs")
+ }
+
+ for _, blob := range req.Blobs {
+ if err := git.ValidateRevision(blob.Revision, git.AllowPathScopedRevision()); err != nil {
+ return err
+ }
+
+ if len(blob.GetPath()) == 0 {
+ return errors.New("empty path")
+ }
+ }
+
+ return nil
+}
+
+type checkBlobsGeneratedSender struct {
+ stream gitalypb.AnalysisService_CheckBlobsGeneratedServer
+ response *gitalypb.CheckBlobsGeneratedResponse
+}
+
+func (s *checkBlobsGeneratedSender) Reset() {
+ s.response = &gitalypb.CheckBlobsGeneratedResponse{}
+}
+
+func (s *checkBlobsGeneratedSender) Append(m proto.Message) {
+ s.response.Blobs = append(s.response.Blobs, m.(*gitalypb.CheckBlobsGeneratedResponse_Blob))
+}
+
+func (s *checkBlobsGeneratedSender) Send() error {
+ return s.stream.Send(s.response)
+}
diff --git a/internal/gitaly/service/analysis/check_generated_test.go b/internal/gitaly/service/analysis/check_generated_test.go
new file mode 100644
index 000000000..a3ca63de9
--- /dev/null
+++ b/internal/gitaly/service/analysis/check_generated_test.go
@@ -0,0 +1,374 @@
+package analysis
+
+import (
+ "errors"
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestCheckBlobsGenerated(t *testing.T) {
+ t.Parallel()
+
+ ctx := testhelper.Context(t)
+ cfg := testcfg.Build(t)
+
+ addr := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterAnalysisServiceServer(srv, NewServer(deps))
+ gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps))
+ })
+ cfg.SocketPath = addr
+
+ conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ testhelper.MustClose(t, conn)
+ })
+
+ client := gitalypb.NewAnalysisServiceClient(conn)
+
+ type setupData struct {
+ requests []*gitalypb.CheckBlobsGeneratedRequest
+ expectedResponses []*gitalypb.CheckBlobsGeneratedResponse
+ expectedError error
+ }
+
+ for _, tc := range []struct {
+ desc string
+ setup func(t *testing.T) setupData
+ }{
+ {
+ desc: "repository not set",
+ setup: func(t *testing.T) setupData {
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: nil,
+ },
+ },
+ expectedError: structerr.NewInvalidArgument("repository not set"),
+ }
+ },
+ },
+ {
+ desc: "blobs not set",
+ setup: func(t *testing.T) setupData {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: nil,
+ },
+ },
+ expectedError: structerr.NewInvalidArgument("validating request: empty blobs"),
+ }
+ },
+ },
+ {
+ desc: "revision not set",
+ setup: func(t *testing.T) setupData {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: nil,
+ Path: []byte("path/to/file"),
+ },
+ },
+ },
+ },
+ expectedError: structerr.NewInvalidArgument("validating request: empty revision"),
+ }
+ },
+ },
+ {
+ desc: "invalid revision",
+ setup: func(t *testing.T) setupData {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte("not a valid revision"),
+ Path: []byte("path/to/file"),
+ },
+ },
+ },
+ },
+ expectedError: structerr.NewInvalidArgument("validating request: revision can't contain whitespace"),
+ }
+ },
+ },
+ {
+ desc: "path not set",
+ setup: func(t *testing.T) setupData {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte("foo:bar"),
+ Path: nil,
+ },
+ },
+ },
+ },
+ expectedError: structerr.NewInvalidArgument("validating request: empty path"),
+ }
+ },
+ },
+ {
+ desc: "object not found",
+ setup: func(t *testing.T) setupData {
+ repo, _ := gittest.CreateRepository(t, ctx, cfg)
+ revision := "foo:bar"
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte(revision),
+ Path: []byte("bar"),
+ },
+ },
+ },
+ },
+ expectedError: testhelper.WithInterceptedMetadata(
+ structerr.NewInternal("reading object: object not found"),
+ "revision",
+ revision,
+ ),
+ }
+ },
+ },
+ {
+ desc: "revision does not resolve to a blob",
+ setup: func(t *testing.T) setupData {
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
+ commitID := gittest.WriteCommit(t, cfg, repoPath)
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte(commitID),
+ Path: []byte("bar"),
+ },
+ },
+ },
+ },
+ expectedError: structerr.NewInvalidArgument("object is not a blob"),
+ }
+ },
+ },
+ {
+ desc: "detect generated blob via file path",
+ setup: func(t *testing.T) setupData {
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
+ blobID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte(blobID),
+ Path: []byte("Gopkg.lock"),
+ },
+ },
+ },
+ },
+ expectedResponses: []*gitalypb.CheckBlobsGeneratedResponse{
+ {
+ Blobs: []*gitalypb.CheckBlobsGeneratedResponse_Blob{
+ {
+ Revision: []byte(blobID),
+ Generated: true,
+ },
+ },
+ },
+ },
+ }
+ },
+ },
+ {
+ desc: "detect generated blob via content",
+ setup: func(t *testing.T) setupData {
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
+ gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("foo"), gittest.WithTreeEntries(
+ gittest.TreeEntry{Mode: "100644", Path: "bar.go", Content: "Code generated by\nfoobar\n"},
+ ))
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte("foo:bar.go"),
+ Path: []byte("bar.go"),
+ },
+ },
+ },
+ },
+ expectedResponses: []*gitalypb.CheckBlobsGeneratedResponse{
+ {
+ Blobs: []*gitalypb.CheckBlobsGeneratedResponse_Blob{
+ {
+ Revision: []byte("foo:bar.go"),
+ Generated: true,
+ },
+ },
+ },
+ },
+ }
+ },
+ },
+ {
+ desc: "detect non-generated blob",
+ setup: func(t *testing.T) setupData {
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
+ blobID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte(blobID),
+ Path: []byte("foobar.go"),
+ },
+ },
+ },
+ },
+ expectedResponses: []*gitalypb.CheckBlobsGeneratedResponse{
+ {
+ Blobs: []*gitalypb.CheckBlobsGeneratedResponse_Blob{
+ {
+ Revision: []byte(blobID),
+ Generated: false,
+ },
+ },
+ },
+ },
+ }
+ },
+ },
+ {
+ desc: "check stream of files",
+ setup: func(t *testing.T) setupData {
+ repo, repoPath := gittest.CreateRepository(t, ctx, cfg)
+ generatedBlobID := gittest.WriteBlob(t, cfg, repoPath, []byte("Code generated by\nfoobar\n"))
+ normalBlobID := gittest.WriteBlob(t, cfg, repoPath, []byte("foobar"))
+
+ return setupData{
+ requests: []*gitalypb.CheckBlobsGeneratedRequest{
+ {
+ Repository: repo,
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte(generatedBlobID),
+ Path: []byte("foo.go"),
+ },
+ {
+ Revision: []byte(normalBlobID),
+ Path: []byte("bar.go"),
+ },
+ },
+ },
+ {
+ Blobs: []*gitalypb.CheckBlobsGeneratedRequest_Blob{
+ {
+ Revision: []byte(normalBlobID),
+ Path: []byte("bar.go"),
+ },
+ },
+ },
+ },
+ expectedResponses: []*gitalypb.CheckBlobsGeneratedResponse{
+ {
+ Blobs: []*gitalypb.CheckBlobsGeneratedResponse_Blob{
+ {
+ Revision: []byte(generatedBlobID),
+ Generated: true,
+ },
+ {
+ Revision: []byte(normalBlobID),
+ Generated: false,
+ },
+ },
+ },
+ {
+ Blobs: []*gitalypb.CheckBlobsGeneratedResponse_Blob{
+ {
+ Revision: []byte(normalBlobID),
+ Generated: false,
+ },
+ },
+ },
+ },
+ }
+ },
+ },
+ } {
+ tc := tc
+ t.Run(tc.desc, func(t *testing.T) {
+ t.Parallel()
+ testSetup := tc.setup(t)
+
+ stream, err := client.CheckBlobsGenerated(ctx)
+ require.NoError(t, err)
+
+ for _, req := range testSetup.requests {
+ err := stream.Send(req)
+ require.NoError(t, err)
+ }
+
+ require.NoError(t, stream.CloseSend())
+
+ var actualResponses []*gitalypb.CheckBlobsGeneratedResponse
+ for {
+ resp, err := stream.Recv()
+ if err != nil {
+ if !errors.Is(err, io.EOF) {
+ testhelper.RequireGrpcError(t, testSetup.expectedError, err)
+ }
+ break
+ }
+
+ actualResponses = append(actualResponses, resp)
+ }
+
+ testhelper.ProtoEqual(t, testSetup.expectedResponses, actualResponses)
+ })
+ }
+}
diff --git a/internal/gitaly/service/analysis/server.go b/internal/gitaly/service/analysis/server.go
new file mode 100644
index 000000000..965ac72af
--- /dev/null
+++ b/internal/gitaly/service/analysis/server.go
@@ -0,0 +1,33 @@
+package analysis
+
+import (
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
+)
+
+type server struct {
+ gitalypb.UnimplementedAnalysisServiceServer
+ logger log.Logger
+ locator storage.Locator
+ gitCmdFactory git.CommandFactory
+ catfileCache catfile.Cache
+}
+
+// NewServer creates a new instance of the gRPC AnalysisService.
+func NewServer(deps *service.Dependencies) gitalypb.AnalysisServiceServer {
+ return &server{
+ logger: deps.GetLogger(),
+ locator: deps.GetLocator(),
+ gitCmdFactory: deps.GetGitCmdFactory(),
+ catfileCache: deps.GetCatfileCache(),
+ }
+}
+
+func (s *server) localrepo(repo storage.Repository) *localrepo.Repo {
+ return localrepo.New(s.logger, s.locator, s.gitCmdFactory, s.catfileCache, repo)
+}
diff --git a/internal/gitaly/service/analysis/testhelper_test.go b/internal/gitaly/service/analysis/testhelper_test.go
new file mode 100644
index 000000000..4b3cbd7dd
--- /dev/null
+++ b/internal/gitaly/service/analysis/testhelper_test.go
@@ -0,0 +1,11 @@
+package analysis
+
+import (
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper"
+)
+
+func TestMain(m *testing.M) {
+ testhelper.Run(m)
+}
diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go
index 4076abb84..e05c8fa07 100644
--- a/internal/gitaly/service/setup/register.go
+++ b/internal/gitaly/service/setup/register.go
@@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service"
+ "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/analysis"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/blob"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/cleanup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/commit"
@@ -51,6 +52,7 @@ var (
// RegisterAll will register all the known gRPC services on the provided gRPC service instance.
func RegisterAll(srv *grpc.Server, deps *service.Dependencies) {
+ gitalypb.RegisterAnalysisServiceServer(srv, analysis.NewServer(deps))
gitalypb.RegisterBlobServiceServer(srv, blob.NewServer(deps))
gitalypb.RegisterCleanupServiceServer(srv, cleanup.NewServer(deps))
gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps))