diff options
author | Jacob Vosmaer <jacob@gitlab.com> | 2019-05-06 12:18:53 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-05-06 12:18:53 +0300 |
commit | 5591eefbfdc2641c99db0b308b9bc70cb7b03792 (patch) | |
tree | ad0e7a7158cc668afaeb909f17259a98b621ad3e | |
parent | d2fbeabebfbbdf83fede4e4f939eeac22c661c5e (diff) | |
parent | a3b015b79c70d6ef6d3e00241ab21e9715350cc3 (diff) |
Merge branch '30093-apply-bfg-object-map-streaming' into 'master'
Implement the ApplyBfgObjectMapStream RPC
See merge request gitlab-org/gitaly!1199
-rw-r--r-- | internal/service/cleanup/apply_bfg_object_map.go | 2 | ||||
-rw-r--r-- | internal/service/cleanup/apply_bfg_object_map_stream.go | 108 | ||||
-rw-r--r-- | internal/service/cleanup/apply_bfg_object_map_stream_test.go | 133 | ||||
-rw-r--r-- | internal/service/cleanup/apply_bfg_object_map_test.go | 2 | ||||
-rw-r--r-- | internal/service/cleanup/internalrefs/cleaner.go | 27 | ||||
-rw-r--r-- | internal/service/cleanup/notifier/notifier.go | 64 | ||||
-rw-r--r-- | internal/service/cleanup/server.go | 4 |
7 files changed, 327 insertions, 13 deletions
diff --git a/internal/service/cleanup/apply_bfg_object_map.go b/internal/service/cleanup/apply_bfg_object_map.go index 57f431353..37093e40e 100644 --- a/internal/service/cleanup/apply_bfg_object_map.go +++ b/internal/service/cleanup/apply_bfg_object_map.go @@ -35,7 +35,7 @@ func (s *server) ApplyBfgObjectMap(stream gitalypb.CleanupService_ApplyBfgObject // It doesn't matter if new internal references are added after this RPC // starts running - they shouldn't point to the objects removed by the BFG - cleaner, err := internalrefs.NewCleaner(ctx, repo) + cleaner, err := internalrefs.NewCleaner(ctx, repo, nil) if err != nil { return status.Errorf(codes.Internal, err.Error()) } diff --git a/internal/service/cleanup/apply_bfg_object_map_stream.go b/internal/service/cleanup/apply_bfg_object_map_stream.go new file mode 100644 index 000000000..594a509a0 --- /dev/null +++ b/internal/service/cleanup/apply_bfg_object_map_stream.go @@ -0,0 +1,108 @@ +package cleanup + +import ( + "fmt" + "io" + + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" + "gitlab.com/gitlab-org/gitaly/internal/service/cleanup/internalrefs" + "gitlab.com/gitlab-org/gitaly/internal/service/cleanup/notifier" + "gitlab.com/gitlab-org/gitaly/streamio" +) + +type bfgStreamReader struct { + firstRequest *gitalypb.ApplyBfgObjectMapStreamRequest + + server gitalypb.CleanupService_ApplyBfgObjectMapStreamServer +} + +type bfgStreamWriter struct { + entries []*gitalypb.ApplyBfgObjectMapStreamResponse_Entry + + server gitalypb.CleanupService_ApplyBfgObjectMapStreamServer +} + +func (s *server) ApplyBfgObjectMapStream(server gitalypb.CleanupService_ApplyBfgObjectMapStreamServer) error { + firstRequest, err := server.Recv() + if err != nil { + return helper.ErrInternal(err) + } + + if err := validateFirstRequest(firstRequest); err != nil { + return helper.ErrInvalidArgument(err) + } + + ctx := server.Context() + repo := firstRequest.GetRepository() + reader := &bfgStreamReader{firstRequest: firstRequest, server: server} + chunker := chunk.New(&bfgStreamWriter{server: server}) + + notifier, err := notifier.New(ctx, repo, chunker) + if err != nil { + return helper.ErrInternal(err) + } + + // It doesn't matter if new internal references are added after this RPC + // starts running - they shouldn't point to the objects removed by the BFG + cleaner, err := internalrefs.NewCleaner(ctx, repo, notifier.Notify) + if err != nil { + return helper.ErrInternal(err) + } + + if err := cleaner.ApplyObjectMap(reader.streamReader()); err != nil { + if invalidErr, ok := err.(internalrefs.ErrInvalidObjectMap); ok { + return helper.ErrInvalidArgument(invalidErr) + } + + return helper.ErrInternal(err) + } + + return helper.ErrInternal(chunker.Flush()) +} + +func validateFirstRequest(req *gitalypb.ApplyBfgObjectMapStreamRequest) error { + if repo := req.GetRepository(); repo == nil { + return fmt.Errorf("first request: repository not set") + } + + return nil +} + +func (r *bfgStreamReader) readOne() ([]byte, error) { + if r.firstRequest != nil { + data := r.firstRequest.GetObjectMap() + r.firstRequest = nil + return data, nil + } + + req, err := r.server.Recv() + if err != nil { + return nil, err + } + + return req.GetObjectMap(), nil +} + +func (r *bfgStreamReader) streamReader() io.Reader { + return streamio.NewReader(r.readOne) +} + +func (w *bfgStreamWriter) Append(it chunk.Item) { + w.entries = append( + w.entries, + it.(*gitalypb.ApplyBfgObjectMapStreamResponse_Entry), + ) +} + +func (w *bfgStreamWriter) Reset() { + w.entries = nil +} + +func (w *bfgStreamWriter) Send() error { + msg := &gitalypb.ApplyBfgObjectMapStreamResponse{Entries: w.entries} + + return w.server.Send(msg) +} diff --git a/internal/service/cleanup/apply_bfg_object_map_stream_test.go b/internal/service/cleanup/apply_bfg_object_map_stream_test.go new file mode 100644 index 000000000..e4c916330 --- /dev/null +++ b/internal/service/cleanup/apply_bfg_object_map_stream_test.go @@ -0,0 +1,133 @@ +package cleanup + +import ( + "context" + "fmt" + "io" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "google.golang.org/grpc/codes" + + "gitlab.com/gitlab-org/gitaly/internal/git" + "gitlab.com/gitlab-org/gitaly/internal/git/log" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +func TestApplyBfgObjectMapStreamSuccess(t *testing.T) { + server, serverSocketPath := runCleanupServiceServer(t) + defer server.Stop() + + client, conn := newCleanupServiceClient(t, serverSocketPath) + defer conn.Close() + + testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + ctx, cancel := testhelper.Context() + defer cancel() + + headCommit, err := log.GetCommit(ctx, testRepo, "HEAD") + require.NoError(t, err) + + // A known blob: the CHANGELOG in the test repository + blobID := "53855584db773c3df5b5f61f72974cb298822fbb" + + // A known tag: v1.0.0 + tagID := "f4e6814c3e4e7a0de82a9e7cd20c626cc963a2f8" + + // Create some refs pointing to HEAD + for _, ref := range []string{ + "refs/environments/1", "refs/keep-around/1", "refs/merge-requests/1", + "refs/heads/_keep", "refs/tags/_keep", "refs/notes/_keep", + } { + testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "update-ref", ref, headCommit.Id) + } + + objectMapData := fmt.Sprintf( + strings.Repeat("%s %s\n", 4), + headCommit.Id, headCommit.Id, + git.NullSHA, blobID, + git.NullSHA, tagID, + git.NullSHA, git.NullSHA, + ) + + entries, err := doStreamingRequest(ctx, t, testRepo, client, objectMapData) + require.NoError(t, err) + + // Ensure that the internal refs are gone, but the others still exist + refs := testhelper.GetRepositoryRefs(t, testRepoPath) + assert.NotContains(t, refs, "refs/environments/1") + assert.NotContains(t, refs, "refs/keep-around/1") + assert.NotContains(t, refs, "refs/merge-requests/1") + assert.Contains(t, refs, "refs/heads/_keep") + assert.Contains(t, refs, "refs/tags/_keep") + assert.Contains(t, refs, "refs/notes/_keep") + + // Ensure that the returned entry is correct + require.Len(t, entries, 4, "wrong number of entries returned") + requireEntry(t, entries[0], headCommit.Id, headCommit.Id, gitalypb.ObjectType_COMMIT) + requireEntry(t, entries[1], git.NullSHA, blobID, gitalypb.ObjectType_BLOB) + requireEntry(t, entries[2], git.NullSHA, tagID, gitalypb.ObjectType_TAG) + requireEntry(t, entries[3], git.NullSHA, git.NullSHA, gitalypb.ObjectType_UNKNOWN) +} + +func requireEntry(t *testing.T, entry *gitalypb.ApplyBfgObjectMapStreamResponse_Entry, oldOid, newOid string, objectType gitalypb.ObjectType) { + require.Equal(t, objectType, entry.Type) + require.Equal(t, oldOid, entry.OldOid) + require.Equal(t, newOid, entry.NewOid) +} + +func TestApplyBfgObjectMapStreamFailsOnInvalidInput(t *testing.T) { + server, serverSocketPath := runCleanupServiceServer(t) + defer server.Stop() + + client, conn := newCleanupServiceClient(t, serverSocketPath) + defer conn.Close() + + testRepo, _, cleanupFn := testhelper.NewTestRepo(t) + defer cleanupFn() + + ctx, cancel := testhelper.Context() + defer cancel() + + entries, err := doStreamingRequest(ctx, t, testRepo, client, "invalid-data here as you can see") + require.Empty(t, entries) + testhelper.RequireGrpcError(t, err, codes.InvalidArgument) +} + +func doStreamingRequest(ctx context.Context, t *testing.T, repo *gitalypb.Repository, client gitalypb.CleanupServiceClient, objectMap string) ([]*gitalypb.ApplyBfgObjectMapStreamResponse_Entry, error) { + // Split the data across multiple requests + parts := strings.SplitN(objectMap, " ", 2) + req1 := &gitalypb.ApplyBfgObjectMapStreamRequest{ + Repository: repo, + ObjectMap: []byte(parts[0] + " "), + } + req2 := &gitalypb.ApplyBfgObjectMapStreamRequest{ObjectMap: []byte(parts[1])} + + server, err := client.ApplyBfgObjectMapStream(ctx) + require.NoError(t, err) + require.NoError(t, server.Send(req1)) + require.NoError(t, server.Send(req2)) + require.NoError(t, server.CloseSend()) + + // receive all responses in a loop + var entries []*gitalypb.ApplyBfgObjectMapStreamResponse_Entry + for { + rsp, err := server.Recv() + if rsp != nil { + entries = append(entries, rsp.GetEntries()...) + } + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + } + + return entries, nil +} diff --git a/internal/service/cleanup/apply_bfg_object_map_test.go b/internal/service/cleanup/apply_bfg_object_map_test.go index 587a26cc7..9d1e087ff 100644 --- a/internal/service/cleanup/apply_bfg_object_map_test.go +++ b/internal/service/cleanup/apply_bfg_object_map_test.go @@ -52,7 +52,7 @@ func TestApplyBfgObjectMapSuccess(t *testing.T) { assert.Contains(t, refs, "refs/notes/_keep") } -func TestFailsOnInvalidInput(t *testing.T) { +func TestApplyBfgObjectMapFailsOnInvalidInput(t *testing.T) { server, serverSocketPath := runCleanupServiceServer(t) defer server.Stop() diff --git a/internal/service/cleanup/internalrefs/cleaner.go b/internal/service/cleanup/internalrefs/cleaner.go index e5fe152a9..80943d7cb 100644 --- a/internal/service/cleanup/internalrefs/cleaner.go +++ b/internal/service/cleanup/internalrefs/cleaner.go @@ -22,11 +22,17 @@ var internalRefs = []string{ "refs/merge-requests/", } +// A ForEachFunc can be called for every entry in the BFG object map file that +// the cleaner is processing. Returning an error will stop the cleaner before +// it has processed the entry in question +type ForEachFunc func(oldOID, newOID string, isInternalRef bool) error + // Cleaner is responsible for updating the internal references in a repository // as specified by a BFG object map. Currently, internal references pointing to // a commit that has been rewritten will simply be removed. type Cleaner struct { - ctx context.Context + ctx context.Context + forEach ForEachFunc // Map of SHA -> reference names table map[string][]string @@ -39,7 +45,7 @@ type ErrInvalidObjectMap error // NewCleaner builds a new instance of Cleaner, which is used to apply a BFG // object map to a repository. -func NewCleaner(ctx context.Context, repo *gitalypb.Repository) (*Cleaner, error) { +func NewCleaner(ctx context.Context, repo *gitalypb.Repository, forEach ForEachFunc) (*Cleaner, error) { table, err := buildLookupTable(ctx, repo) if err != nil { return nil, err @@ -50,7 +56,7 @@ func NewCleaner(ctx context.Context, repo *gitalypb.Repository) (*Cleaner, error return nil, err } - return &Cleaner{ctx: ctx, table: table, updater: updater}, nil + return &Cleaner{ctx: ctx, table: table, updater: updater, forEach: forEach}, nil } // ApplyObjectMap processes a BFG object map file, removing any internal @@ -70,7 +76,7 @@ func (c *Cleaner) ApplyObjectMap(reader io.Reader) error { return ErrInvalidObjectMap(fmt.Errorf("object map invalid at line %d", i)) } - if err := c.removeRefsFor(shas[0]); err != nil { + if err := c.processEntry(shas[0], shas[1]); err != nil { return err } } @@ -78,14 +84,21 @@ func (c *Cleaner) ApplyObjectMap(reader io.Reader) error { return c.updater.Wait() } -func (c *Cleaner) removeRefsFor(sha string) error { - refs, isPresent := c.table[sha] +func (c *Cleaner) processEntry(oldSHA, newSHA string) error { + refs, isPresent := c.table[oldSHA] + + if c.forEach != nil { + if err := c.forEach(oldSHA, newSHA, isPresent); err != nil { + return err + } + } + if !isPresent { return nil } grpc_logrus.Extract(c.ctx).WithFields(log.Fields{ - "sha": sha, + "sha": oldSHA, "refs": refs, }).Info("removing internal references") diff --git a/internal/service/cleanup/notifier/notifier.go b/internal/service/cleanup/notifier/notifier.go new file mode 100644 index 000000000..e73317eef --- /dev/null +++ b/internal/service/cleanup/notifier/notifier.go @@ -0,0 +1,64 @@ +package notifier + +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/internal/helper/chunk" + + "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" +) + +// Notifier sends messages stating that an OID has been rewritten, looking +// up the type of the OID if necessary. It is not safe for concurrent use +type Notifier struct { + catfile *catfile.Batch + chunker *chunk.Chunker +} + +// New instantiates a new Notifier +func New(ctx context.Context, repo *gitalypb.Repository, chunker *chunk.Chunker) (*Notifier, error) { + catfile, err := catfile.New(ctx, repo) + if err != nil { + return nil, err + } + + return &Notifier{catfile: catfile, chunker: chunker}, nil +} + +// Notify builds a new message and sends it to the chunker +func (n *Notifier) Notify(oldOid, newOid string, isInternalRef bool) error { + objectType := n.lookupType(newOid, isInternalRef) + + entry := &gitalypb.ApplyBfgObjectMapStreamResponse_Entry{ + Type: objectType, + OldOid: oldOid, + NewOid: newOid, + } + + return n.chunker.Send(entry) +} + +func (n *Notifier) lookupType(oid string, isInternalRef bool) gitalypb.ObjectType { + if isInternalRef { + return gitalypb.ObjectType_COMMIT + } + + info, err := n.catfile.Info(oid) + if err != nil { + return gitalypb.ObjectType_UNKNOWN + } + + switch info.Type { + case "commit": + return gitalypb.ObjectType_COMMIT + case "blob": + return gitalypb.ObjectType_BLOB + case "tree": + return gitalypb.ObjectType_TREE + case "tag": + return gitalypb.ObjectType_TAG + default: + return gitalypb.ObjectType_UNKNOWN + } +} diff --git a/internal/service/cleanup/server.go b/internal/service/cleanup/server.go index dcb16ae4e..b1bd4e0f5 100644 --- a/internal/service/cleanup/server.go +++ b/internal/service/cleanup/server.go @@ -17,7 +17,3 @@ func NewServer() gitalypb.CleanupServiceServer { func (s *server) CloseSession(context.Context, *gitalypb.CloseSessionRequest) (*gitalypb.CloseSessionResponse, error) { return nil, helper.Unimplemented } - -func (s *server) ApplyBfgObjectMapStream(gitalypb.CleanupService_ApplyBfgObjectMapStreamServer) error { - return helper.Unimplemented -} |