Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2018-12-06 16:10:05 +0300
committerJacob Vosmaer <jacob@gitlab.com>2018-12-06 16:10:05 +0300
commit5f9281580aff50e6b5d33bdde26b1bf647d5d580 (patch)
tree32d8e1da756fb5f957fedd9b8b1a027cb5e0b29d
parent8ca8e47b78fb1cc6e615dbd00e3307f633dbfd12 (diff)
parent273b51fe90036ccf77c5ec51af77420885042bcd (diff)
Merge branch 'ce-19376-apply-bfg-rpc' into 'master'
Clean up a repository by applying a BFG object map to it See merge request gitlab-org/gitaly!990
-rw-r--r--changelogs/unreleased/ce-19376-apply-bfg-rpc.yml5
-rw-r--r--internal/command/command.go44
-rw-r--r--internal/command/command_test.go22
-rw-r--r--internal/git/command.go24
-rw-r--r--internal/git/updateref/updateref.go57
-rw-r--r--internal/git/updateref/updateref_test.go142
-rw-r--r--internal/service/cleanup/apply_bfg_object_map.go48
-rw-r--r--internal/service/cleanup/apply_bfg_object_map_test.go88
-rw-r--r--internal/service/cleanup/internalrefs/cleaner.go140
-rw-r--r--internal/service/cleanup/testhelper_test.go41
10 files changed, 605 insertions, 6 deletions
diff --git a/changelogs/unreleased/ce-19376-apply-bfg-rpc.yml b/changelogs/unreleased/ce-19376-apply-bfg-rpc.yml
new file mode 100644
index 000000000..578a5cb8b
--- /dev/null
+++ b/changelogs/unreleased/ce-19376-apply-bfg-rpc.yml
@@ -0,0 +1,5 @@
+---
+title: Add an RPC that allows repository size to be reduced by bulk-removing internal references
+merge_request: 990
+author:
+type: added
diff --git a/internal/command/command.go b/internal/command/command.go
index 7fa488279..38813f988 100644
--- a/internal/command/command.go
+++ b/internal/command/command.go
@@ -2,6 +2,7 @@ package command
import (
"context"
+ "errors"
"fmt"
"io"
"io/ioutil"
@@ -52,6 +53,7 @@ var exportedEnvVars = []string{
// created it is canceled.
type Command struct {
reader io.Reader
+ writer io.WriteCloser
logrusWriter io.WriteCloser
cmd *exec.Cmd
context context.Context
@@ -61,6 +63,19 @@ type Command struct {
waitOnce sync.Once
}
+type stdinSentinel struct{}
+
+func (stdinSentinel) Read([]byte) (int, error) {
+ return 0, errors.New("stdin sentinel should not be read from")
+}
+
+// SetupStdin instructs New() to configure the stdin pipe of the command it is
+// creating. This allows you call Write() on the command as if it is an ordinary
+// io.Writer, sending data directly to the stdin of the process.
+//
+// You should not call Read() on this value - it is strictly for configuration!
+var SetupStdin io.Reader = stdinSentinel{}
+
// Read calls Read() on the stdout pipe of the command.
func (c *Command) Read(p []byte) (int, error) {
if c.reader == nil {
@@ -70,6 +85,15 @@ func (c *Command) Read(p []byte) (int, error) {
return c.reader.Read(p)
}
+// Write calls Write() on the stdin pipe of the command.
+func (c *Command) Write(p []byte) (int, error) {
+ if c.writer == nil {
+ panic("command has no writer")
+ }
+
+ return c.writer.Write(p)
+}
+
// Wait calls Wait() on the exec.Cmd instance inside the command. This
// blocks until the command has finished and reports the command exit
// status via the error return value. Use ExitStatus to get the integer
@@ -109,6 +133,9 @@ type contextWithoutDonePanic string
// New creates a Command from an exec.Cmd. On success, the Command
// contains a running subprocess. When ctx is canceled the embedded
// process will be terminated and reaped automatically.
+//
+// If stdin is specified as SetupStdin, you will be able to write to the stdin
+// of the subprocess by calling Write() on the returned Command.
func New(ctx context.Context, cmd *exec.Cmd, stdin io.Reader, stdout, stderr io.Writer, env ...string) (*Command, error) {
if ctx.Done() == nil {
panic(contextWithoutDonePanic("command spawned with context without Done() channel"))
@@ -144,7 +171,17 @@ func New(ctx context.Context, cmd *exec.Cmd, stdin io.Reader, stdout, stderr io.
// Start the command in its own process group (nice for signalling)
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
- if stdin != nil {
+ // Three possible values for stdin:
+ // * nil - Go implicitly uses /dev/null
+ // * SetupStdin - configure with cmd.StdinPipe(), allowing Write() to work
+ // * Another io.Reader - becomes cmd.Stdin. Write() will not work
+ if stdin == SetupStdin {
+ pipe, err := cmd.StdinPipe()
+ if err != nil {
+ return nil, fmt.Errorf("GitCommand: stdin: %v", err)
+ }
+ command.writer = pipe
+ } else if stdin != nil {
cmd.Stdin = stdin
}
@@ -203,6 +240,11 @@ func exportEnvironment(env []string) []string {
// This function should never be called directly, use Wait().
func (c *Command) wait() {
+ if c.writer != nil {
+ // Prevent the command from blocking on waiting for stdin to be closed
+ c.writer.Close()
+ }
+
if c.reader != nil {
// Prevent the command from blocking on writing to its stdout.
io.Copy(ioutil.Discard, c.reader)
diff --git a/internal/command/command_test.go b/internal/command/command_test.go
index 22337c27e..7bff115a9 100644
--- a/internal/command/command_test.go
+++ b/internal/command/command_test.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
+ "io"
"os"
"os/exec"
"strings"
@@ -154,3 +155,24 @@ wait:
_, ok := err.(spawnTimeoutError)
require.True(t, ok, "type of error should be spawnTimeoutError")
}
+
+func TestNewCommandWithSetupStdin(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ value := "Test value"
+ output := bytes.NewBuffer(nil)
+
+ cmd, err := New(ctx, exec.Command("cat"), SetupStdin, nil, nil)
+ require.NoError(t, err)
+
+ _, err = fmt.Fprintf(cmd, "%s", value)
+ require.NoError(t, err)
+
+ // The output of the `cat` subprocess should exactly match its input
+ _, err = io.CopyN(output, cmd, int64(len(value)))
+ require.NoError(t, err)
+ require.Equal(t, value, output.String())
+
+ require.NoError(t, cmd.Wait())
+}
diff --git a/internal/git/command.go b/internal/git/command.go
index 012a3f895..fc10324f2 100644
--- a/internal/git/command.go
+++ b/internal/git/command.go
@@ -12,14 +12,34 @@ import (
// Command creates a git.Command with the given args and Repository
func Command(ctx context.Context, repo repository.GitRepo, args ...string) (*command.Command, error) {
- repoPath, env, err := alternates.PathAndEnv(repo)
+ args, env, err := argsAndEnv(repo, args...)
+ if err != nil {
+ return nil, err
+ }
+
+ return BareCommand(ctx, nil, nil, nil, env, args...)
+}
+
+// StdinCommand creates a git.Command with the given args and Repository that is
+// suitable for Write()ing to
+func StdinCommand(ctx context.Context, repo repository.GitRepo, args ...string) (*command.Command, error) {
+ args, env, err := argsAndEnv(repo, args...)
if err != nil {
return nil, err
}
+ return BareCommand(ctx, command.SetupStdin, nil, nil, env, args...)
+}
+
+func argsAndEnv(repo repository.GitRepo, args ...string) ([]string, []string, error) {
+ repoPath, env, err := alternates.PathAndEnv(repo)
+ if err != nil {
+ return nil, nil, err
+ }
+
args = append([]string{"--git-dir", repoPath}, args...)
- return BareCommand(ctx, nil, nil, nil, env, args...)
+ return args, env, nil
}
// BareCommand creates a git.Command with the given args, stdin/stdout/stderr, and env
diff --git a/internal/git/updateref/updateref.go b/internal/git/updateref/updateref.go
new file mode 100644
index 000000000..c61cc7e62
--- /dev/null
+++ b/internal/git/updateref/updateref.go
@@ -0,0 +1,57 @@
+package updateref
+
+import (
+ "context"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/internal/command"
+ "gitlab.com/gitlab-org/gitaly/internal/git"
+ "gitlab.com/gitlab-org/gitaly/internal/git/repository"
+)
+
+// Updater wraps a `git update-ref --stdin` process, presenting an interface
+// that allows references to be easily updated in bulk. It is not suitable for
+// concurrent use.
+type Updater struct {
+ repo repository.GitRepo
+ cmd *command.Command
+}
+
+// New returns a new bulk updater, wrapping a `git update-ref` process. Call the
+// various methods to enqueue updates, then call Wait() to attempt to apply all
+// the updates at once.
+//
+// It is important that ctx gets canceled somewhere. If it doesn't, the process
+// spawned by New() may never terminate.
+func New(ctx context.Context, repo repository.GitRepo) (*Updater, error) {
+ cmd, err := git.StdinCommand(ctx, repo, "update-ref", "-z", "--stdin")
+ if err != nil {
+ return nil, err
+ }
+
+ return &Updater{repo: repo, cmd: cmd}, nil
+}
+
+// Create commands the reference to be created with the sha specified in value
+func (u *Updater) Create(ref, value string) error {
+ _, err := fmt.Fprintf(u.cmd, "create %s\x00%s\x00", ref, value)
+ return err
+}
+
+// Update commands the reference to be updated to point at the sha specified in
+// newvalue
+func (u *Updater) Update(ref, newvalue string) error {
+ _, err := fmt.Fprintf(u.cmd, "update %s\x00%s\x00\x00", ref, newvalue)
+ return err
+}
+
+// Delete commands the reference to be removed from the repository
+func (u *Updater) Delete(ref string) error {
+ _, err := fmt.Fprintf(u.cmd, "delete %s\x00\x00", ref)
+ return err
+}
+
+// Wait applies the commands specified in other calls to the Updater
+func (u *Updater) Wait() error {
+ return u.cmd.Wait()
+}
diff --git a/internal/git/updateref/updateref_test.go b/internal/git/updateref/updateref_test.go
new file mode 100644
index 000000000..1fb9aefb8
--- /dev/null
+++ b/internal/git/updateref/updateref_test.go
@@ -0,0 +1,142 @@
+package updateref
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+
+ "gitlab.com/gitlab-org/gitaly/internal/git/log"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func setup(t *testing.T) (context.Context, *gitalypb.Repository, string, func()) {
+ ctx, cancel := testhelper.Context()
+ testRepo, testRepoPath, cleanup := testhelper.NewTestRepo(t)
+ teardown := func() {
+ cancel()
+ cleanup()
+ }
+
+ return ctx, testRepo, testRepoPath, teardown
+}
+
+func TestCreate(t *testing.T) {
+ ctx, testRepo, _, teardown := setup(t)
+ defer teardown()
+
+ headCommit, err := log.GetCommit(ctx, testRepo, "HEAD")
+ require.NoError(t, err)
+
+ updater, err := New(ctx, testRepo)
+ require.NoError(t, err)
+
+ ref := "refs/heads/_create"
+ sha := headCommit.Id
+
+ require.NoError(t, updater.Create(ref, sha))
+ require.NoError(t, updater.Wait())
+
+ // check the ref was created
+ commit, logErr := log.GetCommit(ctx, testRepo, ref)
+ require.NoError(t, logErr)
+ require.NotNil(t, commit, "reference was not created")
+ require.Equal(t, commit.Id, sha, "reference was created with the wrong SHA")
+}
+
+func TestUpdate(t *testing.T) {
+ ctx, testRepo, _, teardown := setup(t)
+ defer teardown()
+
+ headCommit, err := log.GetCommit(ctx, testRepo, "HEAD")
+ require.NoError(t, err)
+
+ updater, err := New(ctx, testRepo)
+ require.NoError(t, err)
+
+ ref := "refs/heads/feature"
+ sha := headCommit.Id
+
+ // Sanity check: ensure the ref exists before we start
+ commit, logErr := log.GetCommit(ctx, testRepo, ref)
+ require.NoError(t, logErr)
+ require.NotNil(t, commit, "%s does not exist in the test repository", ref)
+ require.NotEqual(t, commit.Id, sha, "%s points to HEAD: %s in the test repository", ref, sha)
+
+ require.NoError(t, updater.Update(ref, sha))
+ require.NoError(t, updater.Wait())
+
+ // check the ref was updated
+ commit, logErr = log.GetCommit(ctx, testRepo, ref)
+ require.NoError(t, logErr)
+ require.NotNil(t, commit)
+ require.Equal(t, commit.Id, sha, "reference was not updated")
+}
+
+func TestDelete(t *testing.T) {
+ ctx, testRepo, _, teardown := setup(t)
+ defer teardown()
+
+ updater, err := New(ctx, testRepo)
+ require.NoError(t, err)
+
+ ref := "refs/heads/feature"
+
+ require.NoError(t, updater.Delete(ref))
+ require.NoError(t, updater.Wait())
+
+ // check the ref was removed
+ commit, logErr := log.GetCommit(ctx, testRepo, ref)
+ require.NoError(t, logErr)
+ require.Nil(t, commit, "reference was not removed")
+}
+
+func TestBulkOperation(t *testing.T) {
+ ctx, testRepo, testRepoPath, teardown := setup(t)
+ defer teardown()
+
+ headCommit, err := log.GetCommit(ctx, testRepo, "HEAD")
+ require.NoError(t, err)
+
+ updater, err := New(ctx, testRepo)
+ require.NoError(t, err)
+
+ for i := 0; i < 1000; i++ {
+ ref := fmt.Sprintf("refs/head/_test_%d", i)
+ require.NoError(t, updater.Create(ref, headCommit.Id), "Failed to create ref %d", i)
+ }
+
+ require.NoError(t, updater.Wait())
+
+ refs := testhelper.GetRepositoryRefs(t, testRepoPath)
+ split := strings.Split(refs, "\n")
+ require.True(t, len(split) > 1000, "At least 1000 refs should be present")
+}
+
+func TestContextCancelAbortsRefChanges(t *testing.T) {
+ ctx, testRepo, _, teardown := setup(t)
+ defer teardown()
+
+ headCommit, err := log.GetCommit(ctx, testRepo, "HEAD")
+ require.NoError(t, err)
+
+ childCtx, childCancel := context.WithCancel(ctx)
+ updater, err := New(childCtx, testRepo)
+ require.NoError(t, err)
+
+ ref := "refs/heads/_shouldnotexist"
+
+ require.NoError(t, updater.Create(ref, headCommit.Id))
+
+ // Force the update-ref process to terminate early
+ childCancel()
+ require.Error(t, updater.Wait())
+
+ // check the ref doesn't exist
+ commit, logErr := log.GetCommit(ctx, testRepo, ref)
+ require.NoError(t, logErr)
+ require.Nil(t, commit, "Reference was created even though the process was aborted")
+}
diff --git a/internal/service/cleanup/apply_bfg_object_map.go b/internal/service/cleanup/apply_bfg_object_map.go
index 668cce786..57f431353 100644
--- a/internal/service/cleanup/apply_bfg_object_map.go
+++ b/internal/service/cleanup/apply_bfg_object_map.go
@@ -2,9 +2,51 @@ package cleanup
import (
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
- "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "gitlab.com/gitlab-org/gitaly/internal/service/cleanup/internalrefs"
+ "gitlab.com/gitlab-org/gitaly/streamio"
)
-func (s *server) ApplyBfgObjectMap(gitalypb.CleanupService_ApplyBfgObjectMapServer) error {
- return helper.Unimplemented
+func (s *server) ApplyBfgObjectMap(stream gitalypb.CleanupService_ApplyBfgObjectMapServer) error {
+ firstRequest, err := stream.Recv()
+ if err != nil {
+ return status.Errorf(codes.Internal, "first request failed: %v", err)
+ }
+
+ repo := firstRequest.GetRepository()
+ if repo == nil {
+ return status.Errorf(codes.InvalidArgument, "empty repository")
+ }
+
+ firstRead := false
+ reader := streamio.NewReader(func() ([]byte, error) {
+ if !firstRead {
+ firstRead = true
+ return firstRequest.GetObjectMap(), nil
+ }
+
+ request, err := stream.Recv()
+ return request.GetObjectMap(), err
+ })
+
+ ctx := stream.Context()
+
+ // It doesn't matter if new internal references are added after this RPC
+ // starts running - they shouldn't point to the objects removed by the BFG
+ cleaner, err := internalrefs.NewCleaner(ctx, repo)
+ if err != nil {
+ return status.Errorf(codes.Internal, err.Error())
+ }
+
+ if err := cleaner.ApplyObjectMap(reader); err != nil {
+ if invalidErr, ok := err.(internalrefs.ErrInvalidObjectMap); ok {
+ return status.Errorf(codes.InvalidArgument, "%s", invalidErr)
+ }
+
+ return status.Errorf(codes.Internal, "%s", err)
+ }
+
+ return stream.SendAndClose(&gitalypb.ApplyBfgObjectMapResponse{})
}
diff --git a/internal/service/cleanup/apply_bfg_object_map_test.go b/internal/service/cleanup/apply_bfg_object_map_test.go
new file mode 100644
index 000000000..587a26cc7
--- /dev/null
+++ b/internal/service/cleanup/apply_bfg_object_map_test.go
@@ -0,0 +1,88 @@
+package cleanup
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "google.golang.org/grpc/codes"
+
+ "gitlab.com/gitlab-org/gitaly/internal/git/log"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func TestApplyBfgObjectMapSuccess(t *testing.T) {
+ server, serverSocketPath := runCleanupServiceServer(t)
+ defer server.Stop()
+
+ client, conn := newCleanupServiceClient(t, serverSocketPath)
+ defer conn.Close()
+
+ testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ headCommit, err := log.GetCommit(ctx, testRepo, "HEAD")
+ require.NoError(t, err)
+
+ // Create some refs pointing to HEAD
+ for _, ref := range []string{
+ "refs/environments/1", "refs/keep-around/1", "refs/merge-requests/1",
+ "refs/heads/_keep", "refs/tags/_keep", "refs/notes/_keep",
+ } {
+ testhelper.MustRunCommand(t, nil, "git", "-C", testRepoPath, "update-ref", ref, headCommit.Id)
+ }
+
+ objectMapData := fmt.Sprintf("%s %s\n", headCommit.Id, strings.Repeat("0", 40))
+ require.NoError(t, doRequest(ctx, t, testRepo, client, objectMapData))
+
+ // Ensure that the internal refs are gone, but the others still exist
+ refs := testhelper.GetRepositoryRefs(t, testRepoPath)
+ assert.NotContains(t, refs, "refs/environments/1")
+ assert.NotContains(t, refs, "refs/keep-around/1")
+ assert.NotContains(t, refs, "refs/merge-requests/1")
+ assert.Contains(t, refs, "refs/heads/_keep")
+ assert.Contains(t, refs, "refs/tags/_keep")
+ assert.Contains(t, refs, "refs/notes/_keep")
+}
+
+func TestFailsOnInvalidInput(t *testing.T) {
+ server, serverSocketPath := runCleanupServiceServer(t)
+ defer server.Stop()
+
+ client, conn := newCleanupServiceClient(t, serverSocketPath)
+ defer conn.Close()
+
+ testRepo, _, cleanupFn := testhelper.NewTestRepo(t)
+ defer cleanupFn()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ err := doRequest(ctx, t, testRepo, client, "invalid-data here as you can see")
+ testhelper.RequireGrpcError(t, err, codes.InvalidArgument)
+}
+
+func doRequest(ctx context.Context, t *testing.T, repo *gitalypb.Repository, client gitalypb.CleanupServiceClient, objectMap string) error {
+ // Split the data across multiple requests
+ parts := strings.SplitN(objectMap, " ", 2)
+ req1 := &gitalypb.ApplyBfgObjectMapRequest{
+ Repository: repo,
+ ObjectMap: []byte(parts[0] + " "),
+ }
+ req2 := &gitalypb.ApplyBfgObjectMapRequest{ObjectMap: []byte(parts[1])}
+
+ stream, err := client.ApplyBfgObjectMap(ctx)
+ require.NoError(t, err)
+ require.NoError(t, stream.Send(req1))
+ require.NoError(t, stream.Send(req2))
+
+ _, err = stream.CloseAndRecv()
+ return err
+}
diff --git a/internal/service/cleanup/internalrefs/cleaner.go b/internal/service/cleanup/internalrefs/cleaner.go
new file mode 100644
index 000000000..e5fe152a9
--- /dev/null
+++ b/internal/service/cleanup/internalrefs/cleaner.go
@@ -0,0 +1,140 @@
+package internalrefs
+
+import (
+ "bufio"
+ "context"
+ "fmt"
+ "io"
+ "strings"
+
+ grpc_logrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+ log "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+
+ "gitlab.com/gitlab-org/gitaly/internal/git"
+ "gitlab.com/gitlab-org/gitaly/internal/git/updateref"
+)
+
+// Only references in these namespaces are cleaned up
+var internalRefs = []string{
+ "refs/environments/",
+ "refs/keep-around/",
+ "refs/merge-requests/",
+}
+
+// Cleaner is responsible for updating the internal references in a repository
+// as specified by a BFG object map. Currently, internal references pointing to
+// a commit that has been rewritten will simply be removed.
+type Cleaner struct {
+ ctx context.Context
+
+ // Map of SHA -> reference names
+ table map[string][]string
+ updater *updateref.Updater
+}
+
+// ErrInvalidObjectMap is returned with descriptive text if the supplied object
+// map file is in the wrong format
+type ErrInvalidObjectMap error
+
+// NewCleaner builds a new instance of Cleaner, which is used to apply a BFG
+// object map to a repository.
+func NewCleaner(ctx context.Context, repo *gitalypb.Repository) (*Cleaner, error) {
+ table, err := buildLookupTable(ctx, repo)
+ if err != nil {
+ return nil, err
+ }
+
+ updater, err := updateref.New(ctx, repo)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Cleaner{ctx: ctx, table: table, updater: updater}, nil
+}
+
+// ApplyObjectMap processes a BFG object map file, removing any internal
+// references that point to a rewritten commit.
+func (c *Cleaner) ApplyObjectMap(reader io.Reader) error {
+ scanner := bufio.NewScanner(reader)
+ for i := int64(0); scanner.Scan(); i++ {
+ line := scanner.Text()
+
+ // Each line consists of two SHAs: the SHA of the original object, and
+ // the SHA of a replacement object in the new history built by BFG. For
+ // now, the new SHA is ignored, but it may be used to rewrite (rather
+ // than remove) some references in the future.
+ shas := strings.SplitN(line, " ", 2)
+
+ if len(shas) != 2 || len(shas[0]) != 40 || len(shas[1]) != 40 {
+ return ErrInvalidObjectMap(fmt.Errorf("object map invalid at line %d", i))
+ }
+
+ if err := c.removeRefsFor(shas[0]); err != nil {
+ return err
+ }
+ }
+
+ return c.updater.Wait()
+}
+
+func (c *Cleaner) removeRefsFor(sha string) error {
+ refs, isPresent := c.table[sha]
+ if !isPresent {
+ return nil
+ }
+
+ grpc_logrus.Extract(c.ctx).WithFields(log.Fields{
+ "sha": sha,
+ "refs": refs,
+ }).Info("removing internal references")
+
+ // Remove the internal refs pointing to oldSHA
+ for _, ref := range refs {
+ if err := c.updater.Delete(ref); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// buildLookupTable constructs an in-memory map of SHA -> refs. Multiple refs
+// may point to the same SHA.
+//
+// The lookup table is necessary to efficiently check which references point to
+// an object that has been rewritten by the BFG (and so require action). It is
+// consulted once per line in the object map. Git is optimized for ref -> SHA
+// lookups, but we want the opposite!
+func buildLookupTable(ctx context.Context, repo *gitalypb.Repository) (map[string][]string, error) {
+ args := append([]string{"for-each-ref", "--format", "%(objectname) %(refname)"}, internalRefs...)
+ cmd, err := git.Command(ctx, repo, args...)
+ if err != nil {
+ return nil, err
+ }
+
+ logger := grpc_logrus.Extract(ctx)
+ out := make(map[string][]string)
+ scanner := bufio.NewScanner(cmd)
+
+ for scanner.Scan() {
+ line := scanner.Text()
+ parts := strings.SplitN(line, " ", 2)
+ if len(parts) != 2 || len(parts[0]) != 40 {
+ logger.WithFields(log.Fields{"line": line}).Warn("failed to parse git refs")
+ return nil, fmt.Errorf("failed to parse git refs")
+ }
+
+ out[parts[0]] = append(out[parts[0]], parts[1])
+ }
+
+ if err := cmd.Wait(); err != nil {
+ return nil, err
+ }
+
+ if err := scanner.Err(); err != nil {
+ return nil, err
+ }
+
+ return out, nil
+}
diff --git a/internal/service/cleanup/testhelper_test.go b/internal/service/cleanup/testhelper_test.go
new file mode 100644
index 000000000..a43a304f1
--- /dev/null
+++ b/internal/service/cleanup/testhelper_test.go
@@ -0,0 +1,41 @@
+package cleanup
+
+import (
+ "net"
+ "testing"
+
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+func runCleanupServiceServer(t *testing.T) (*grpc.Server, string) {
+ serverSocketPath := testhelper.GetTemporaryGitalySocketFileName()
+ grpcServer := testhelper.NewTestGrpcServer(t, nil, nil)
+
+ listener, err := net.Listen("unix", serverSocketPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ gitalypb.RegisterCleanupServiceServer(grpcServer, NewServer())
+ reflection.Register(grpcServer)
+
+ go grpcServer.Serve(listener)
+
+ return grpcServer, "unix://" + serverSocketPath
+}
+
+func newCleanupServiceClient(t *testing.T, serverSocketPath string) (gitalypb.CleanupServiceClient, *grpc.ClientConn) {
+ connOpts := []grpc.DialOption{
+ grpc.WithInsecure(),
+ }
+ conn, err := grpc.Dial(serverSocketPath, connOpts...)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return gitalypb.NewCleanupServiceClient(conn), conn
+}