Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2019-05-06 12:18:53 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-05-06 12:18:53 +0300
commit5591eefbfdc2641c99db0b308b9bc70cb7b03792 (patch)
treead0e7a7158cc668afaeb909f17259a98b621ad3e
parentd2fbeabebfbbdf83fede4e4f939eeac22c661c5e (diff)
parenta3b015b79c70d6ef6d3e00241ab21e9715350cc3 (diff)
Merge branch '30093-apply-bfg-object-map-streaming' into 'master'
Implement the ApplyBfgObjectMapStream RPC See merge request gitlab-org/gitaly!1199
-rw-r--r--internal/service/cleanup/apply_bfg_object_map.go2
-rw-r--r--internal/service/cleanup/apply_bfg_object_map_stream.go108
-rw-r--r--internal/service/cleanup/apply_bfg_object_map_stream_test.go133
-rw-r--r--internal/service/cleanup/apply_bfg_object_map_test.go2
-rw-r--r--internal/service/cleanup/internalrefs/cleaner.go27
-rw-r--r--internal/service/cleanup/notifier/notifier.go64
-rw-r--r--internal/service/cleanup/server.go4
7 files changed, 327 insertions, 13 deletions
diff --git a/internal/service/cleanup/apply_bfg_object_map.go b/internal/service/cleanup/apply_bfg_object_map.go
index 57f431353..37093e40e 100644
--- a/internal/service/cleanup/apply_bfg_object_map.go
+++ b/internal/service/cleanup/apply_bfg_object_map.go
@@ -35,7 +35,7 @@ func (s *server) ApplyBfgObjectMap(stream gitalypb.CleanupService_ApplyBfgObject
// It doesn't matter if new internal references are added after this RPC
// starts running - they shouldn't point to the objects removed by the BFG
- cleaner, err := internalrefs.NewCleaner(ctx, repo)
+ cleaner, err := internalrefs.NewCleaner(ctx, repo, nil)
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
diff --git a/internal/service/cleanup/apply_bfg_object_map_stream.go b/internal/service/cleanup/apply_bfg_object_map_stream.go
new file mode 100644
index 000000000..594a509a0
--- /dev/null
+++ b/internal/service/cleanup/apply_bfg_object_map_stream.go
@@ -0,0 +1,108 @@
+package cleanup
+
+import (
+ "fmt"
+ "io"
+
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
+ "gitlab.com/gitlab-org/gitaly/internal/service/cleanup/internalrefs"
+ "gitlab.com/gitlab-org/gitaly/internal/service/cleanup/notifier"
+ "gitlab.com/gitlab-org/gitaly/streamio"
+)
+
+type bfgStreamReader struct {
+ firstRequest *gitalypb.ApplyBfgObjectMapStreamRequest
+
+ server gitalypb.CleanupService_ApplyBfgObjectMapStreamServer
+}
+
+type bfgStreamWriter struct {
+ entries []*gitalypb.ApplyBfgObjectMapStreamResponse_Entry
+
+ server gitalypb.CleanupService_ApplyBfgObjectMapStreamServer
+}
+
+func (s *server) ApplyBfgObjectMapStream(server gitalypb.CleanupService_ApplyBfgObjectMapStreamServer) error {
+ firstRequest, err := server.Recv()
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ if err := validateFirstRequest(firstRequest); err != nil {
+ return helper.ErrInvalidArgument(err)
+ }
+
+ ctx := server.Context()
+ repo := firstRequest.GetRepository()
+ reader := &bfgStreamReader{firstRequest: firstRequest, server: server}
+ chunker := chunk.New(&bfgStreamWriter{server: server})
+
+ notifier, err := notifier.New(ctx, repo, chunker)
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ // It doesn't matter if new internal references are added after this RPC
+ // starts running - they shouldn't point to the objects removed by the BFG
+ cleaner, err := internalrefs.NewCleaner(ctx, repo, notifier.Notify)
+ if err != nil {
+ return helper.ErrInternal(err)
+ }
+
+ if err := cleaner.ApplyObjectMap(reader.streamReader()); err != nil {
+ if invalidErr, ok := err.(internalrefs.ErrInvalidObjectMap); ok {
+ return helper.ErrInvalidArgument(invalidErr)
+ }
+
+ return helper.ErrInternal(err)
+ }
+
+ return helper.ErrInternal(chunker.Flush())
+}
+
+func validateFirstRequest(req *gitalypb.ApplyBfgObjectMapStreamRequest) error {
+ if repo := req.GetRepository(); repo == nil {
+ return fmt.Errorf("first request: repository not set")
+ }
+
+ return nil
+}
+
+func (r *bfgStreamReader) readOne() ([]byte, error) {
+ if r.firstRequest != nil {
+ data := r.firstRequest.GetObjectMap()
+ r.firstRequest = nil
+ return data, nil
+ }
+
+ req, err := r.server.Recv()
+ if err != nil {
+ return nil, err
+ }
+
+ return req.GetObjectMap(), nil
+}
+
+func (r *bfgStreamReader) streamReader() io.Reader {
+ return streamio.NewReader(r.readOne)
+}
+
+func (w *bfgStreamWriter) Append(it chunk.Item) {
+ w.entries = append(
+ w.entries,
+ it.(*gitalypb.ApplyBfgObjectMapStreamResponse_Entry),
+ )
+}
+
+func (w *bfgStreamWriter) Reset() {
+ w.entries = nil
+}
+
+func (w *bfgStreamWriter) Send() error {
+ msg := &gitalypb.ApplyBfgObjectMapStreamResponse{Entries: w.entries}
+
+ return w.server.Send(msg)
+}
diff --git a/internal/service/cleanup/apply_bfg_object_map_stream_test.go b/internal/service/cleanup/apply_bfg_object_map_stream_test.go
new file mode 100644
index 000000000..e4c916330
--- /dev/null
+++ b/internal/service/cleanup/apply_bfg_object_map_stream_test.go
@@ -0,0 +1,133 @@
+package cleanup
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+
+ "gitlab.com/gitlab-org/gitaly/internal/git"
+ "gitlab.com/gitlab-org/gitaly/internal/git/log"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestApplyBfgObjectMapStreamSuccess(t *testing.T) {
+ server, serverSocketPath := runCleanupServiceServer(t)
+ defer server.Stop()
+
+ client, conn := newCleanupServiceClient(t, serverSocketPath)
+ defer conn.Close()
+
+ testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ headCommit, err := log.GetCommit(ctx, testRepo, "HEAD")
+ require.NoError(t, err)
+
+ // A known blob: the CHANGELOG in the test repository
+ blobID := "53855584db773c3df5b5f61f72974cb298822fbb"
+
+ // A known tag: v1.0.0
+ tagID := "f4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8"
+
+ // Create some refs pointing to HEAD
+ for _, ref := range []string{
+ "refs/environments/1", "refs/keep-around/1", "refs/merge-requests/1",
+ "refs/heads/_keep", "refs/tags/_keep", "refs/notes/_keep",
+ } {
+ testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "update-ref", ref, headCommit.Id)
+ }
+
+ objectMapData := fmt.Sprintf(
+ strings.Repeat("%s %s\n", 4),
+ headCommit.Id, headCommit.Id,
+ git.NullSHA, blobID,
+ git.NullSHA, tagID,
+ git.NullSHA, git.NullSHA,
+ )
+
+ entries, err := doStreamingRequest(ctx, t, testRepo, client, objectMapData)
+ require.NoError(t, err)
+
+ // Ensure that the internal refs are gone, but the others still exist
+ refs := testhelper.GetRepositoryRefs(t, testRepoPath)
+ assert.NotContains(t, refs, "refs/environments/1")
+ assert.NotContains(t, refs, "refs/keep-around/1")
+ assert.NotContains(t, refs, "refs/merge-requests/1")
+ assert.Contains(t, refs, "refs/heads/_keep")
+ assert.Contains(t, refs, "refs/tags/_keep")
+ assert.Contains(t, refs, "refs/notes/_keep")
+
+ // Ensure that the returned entry is correct
+ require.Len(t, entries, 4, "wrong number of entries returned")
+ requireEntry(t, entries[0], headCommit.Id, headCommit.Id, gitalypb.ObjectType_COMMIT)
+ requireEntry(t, entries[1], git.NullSHA, blobID, gitalypb.ObjectType_BLOB)
+ requireEntry(t, entries[2], git.NullSHA, tagID, gitalypb.ObjectType_TAG)
+ requireEntry(t, entries[3], git.NullSHA, git.NullSHA, gitalypb.ObjectType_UNKNOWN)
+}
+
+func requireEntry(t *testing.T, entry *gitalypb.ApplyBfgObjectMapStreamResponse_Entry, oldOid, newOid string, objectType gitalypb.ObjectType) {
+ require.Equal(t, objectType, entry.Type)
+ require.Equal(t, oldOid, entry.OldOid)
+ require.Equal(t, newOid, entry.NewOid)
+}
+
+func TestApplyBfgObjectMapStreamFailsOnInvalidInput(t *testing.T) {
+ server, serverSocketPath := runCleanupServiceServer(t)
+ defer server.Stop()
+
+ client, conn := newCleanupServiceClient(t, serverSocketPath)
+ defer conn.Close()
+
+ testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ entries, err := doStreamingRequest(ctx, t, testRepo, client, "invalid-data here as you can see")
+ require.Empty(t, entries)
+ testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
+}
+
+func doStreamingRequest(ctx context.Context, t *testing.T, repo *gitalypb.Repository, client gitalypb.CleanupServiceClient, objectMap string) ([]*gitalypb.ApplyBfgObjectMapStreamResponse_Entry, error) {
+ // Split the data across multiple requests
+ parts := strings.SplitN(objectMap, " ", 2)
+ req1 := &gitalypb.ApplyBfgObjectMapStreamRequest{
+ Repository: repo,
+ ObjectMap: []byte(parts[0] + " "),
+ }
+ req2 := &gitalypb.ApplyBfgObjectMapStreamRequest{ObjectMap: []byte(parts[1])}
+
+ server, err := client.ApplyBfgObjectMapStream(ctx)
+ require.NoError(t, err)
+ require.NoError(t, server.Send(req1))
+ require.NoError(t, server.Send(req2))
+ require.NoError(t, server.CloseSend())
+
+ // receive all responses in a loop
+ var entries []*gitalypb.ApplyBfgObjectMapStreamResponse_Entry
+ for {
+ rsp, err := server.Recv()
+ if rsp != nil {
+ entries = append(entries, rsp.GetEntries()...)
+ }
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return entries, nil
+}
diff --git a/internal/service/cleanup/apply_bfg_object_map_test.go b/internal/service/cleanup/apply_bfg_object_map_test.go
index 587a26cc7..9d1e087ff 100644
--- a/internal/service/cleanup/apply_bfg_object_map_test.go
+++ b/internal/service/cleanup/apply_bfg_object_map_test.go
@@ -52,7 +52,7 @@ func TestApplyBfgObjectMapSuccess(t *testing.T) {
assert.Contains(t, refs, "refs/notes/_keep")
}
-func TestFailsOnInvalidInput(t *testing.T) {
+func TestApplyBfgObjectMapFailsOnInvalidInput(t *testing.T) {
server, serverSocketPath := runCleanupServiceServer(t)
defer server.Stop()
diff --git a/internal/service/cleanup/internalrefs/cleaner.go b/internal/service/cleanup/internalrefs/cleaner.go
index e5fe152a9..80943d7cb 100644
--- a/internal/service/cleanup/internalrefs/cleaner.go
+++ b/internal/service/cleanup/internalrefs/cleaner.go
@@ -22,11 +22,17 @@ var internalRefs = []string{
"refs/merge-requests/",
}
+// A ForEachFunc can be called for every entry in the BFG object map file that
+// the cleaner is processing. Returning an error will stop the cleaner before
+// it has processed the entry in question
+type ForEachFunc func(oldOID, newOID string, isInternalRef bool) error
+
// Cleaner is responsible for updating the internal references in a repository
// as specified by a BFG object map. Currently, internal references pointing to
// a commit that has been rewritten will simply be removed.
type Cleaner struct {
- ctx context.Context
+ ctx context.Context
+ forEach ForEachFunc
// Map of SHA -> reference names
table map[string][]string
@@ -39,7 +45,7 @@ type ErrInvalidObjectMap error
// NewCleaner builds a new instance of Cleaner, which is used to apply a BFG
// object map to a repository.
-func NewCleaner(ctx context.Context, repo *gitalypb.Repository) (*Cleaner, error) {
+func NewCleaner(ctx context.Context, repo *gitalypb.Repository, forEach ForEachFunc) (*Cleaner, error) {
table, err := buildLookupTable(ctx, repo)
if err != nil {
return nil, err
@@ -50,7 +56,7 @@ func NewCleaner(ctx context.Context, repo *gitalypb.Repository) (*Cleaner, error
return nil, err
}
- return &Cleaner{ctx: ctx, table: table, updater: updater}, nil
+ return &Cleaner{ctx: ctx, table: table, updater: updater, forEach: forEach}, nil
}
// ApplyObjectMap processes a BFG object map file, removing any internal
@@ -70,7 +76,7 @@ func (c *Cleaner) ApplyObjectMap(reader io.Reader) error {
return ErrInvalidObjectMap(fmt.Errorf("object map invalid at line %d", i))
}
- if err := c.removeRefsFor(shas[0]); err != nil {
+ if err := c.processEntry(shas[0], shas[1]); err != nil {
return err
}
}
@@ -78,14 +84,21 @@ func (c *Cleaner) ApplyObjectMap(reader io.Reader) error {
return c.updater.Wait()
}
-func (c *Cleaner) removeRefsFor(sha string) error {
- refs, isPresent := c.table[sha]
+func (c *Cleaner) processEntry(oldSHA, newSHA string) error {
+ refs, isPresent := c.table[oldSHA]
+
+ if c.forEach != nil {
+ if err := c.forEach(oldSHA, newSHA, isPresent); err != nil {
+ return err
+ }
+ }
+
if !isPresent {
return nil
}
grpc_logrus.Extract(c.ctx).WithFields(log.Fields{
- "sha": sha,
+ "sha": oldSHA,
"refs": refs,
}).Info("removing internal references")
diff --git a/internal/service/cleanup/notifier/notifier.go b/internal/service/cleanup/notifier/notifier.go
new file mode 100644
index 000000000..e73317eef
--- /dev/null
+++ b/internal/service/cleanup/notifier/notifier.go
@@ -0,0 +1,64 @@
+package notifier
+
+import (
+ "context"
+
+ "gitlab.com/gitlab-org/gitaly/internal/git/catfile"
+ "gitlab.com/gitlab-org/gitaly/internal/helper/chunk"
+
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+)
+
+// Notifier sends messages stating that an OID has been rewritten, looking
+// up the type of the OID if necessary. It is not safe for concurrent use
+type Notifier struct {
+ catfile *catfile.Batch
+ chunker *chunk.Chunker
+}
+
+// New instantiates a new Notifier
+func New(ctx context.Context, repo *gitalypb.Repository, chunker *chunk.Chunker) (*Notifier, error) {
+ catfile, err := catfile.New(ctx, repo)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Notifier{catfile: catfile, chunker: chunker}, nil
+}
+
+// Notify builds a new message and sends it to the chunker
+func (n *Notifier) Notify(oldOid, newOid string, isInternalRef bool) error {
+ objectType := n.lookupType(newOid, isInternalRef)
+
+ entry := &gitalypb.ApplyBfgObjectMapStreamResponse_Entry{
+ Type: objectType,
+ OldOid: oldOid,
+ NewOid: newOid,
+ }
+
+ return n.chunker.Send(entry)
+}
+
+func (n *Notifier) lookupType(oid string, isInternalRef bool) gitalypb.ObjectType {
+ if isInternalRef {
+ return gitalypb.ObjectType_COMMIT
+ }
+
+ info, err := n.catfile.Info(oid)
+ if err != nil {
+ return gitalypb.ObjectType_UNKNOWN
+ }
+
+ switch info.Type {
+ case "commit":
+ return gitalypb.ObjectType_COMMIT
+ case "blob":
+ return gitalypb.ObjectType_BLOB
+ case "tree":
+ return gitalypb.ObjectType_TREE
+ case "tag":
+ return gitalypb.ObjectType_TAG
+ default:
+ return gitalypb.ObjectType_UNKNOWN
+ }
+}
diff --git a/internal/service/cleanup/server.go b/internal/service/cleanup/server.go
index dcb16ae4e..b1bd4e0f5 100644
--- a/internal/service/cleanup/server.go
+++ b/internal/service/cleanup/server.go
@@ -17,7 +17,3 @@ func NewServer() gitalypb.CleanupServiceServer {
func (s *server) CloseSession(context.Context, *gitalypb.CloseSessionRequest) (*gitalypb.CloseSessionResponse, error) {
return nil, helper.Unimplemented
}
-
-func (s *server) ApplyBfgObjectMapStream(gitalypb.CleanupService_ApplyBfgObjectMapStreamServer) error {
- return helper.Unimplemented
-}