diff options
author | James Fargher <jfargher@gitlab.com> | 2023-01-27 02:28:31 +0300 |
---|---|---|
committer | James Fargher <jfargher@gitlab.com> | 2023-02-10 00:29:14 +0300 |
commit | a4acadc5e16ef6c37a807aac6466051a30ec6846 (patch) | |
tree | f7766f284553db04a274d5e045957ab2c18e8c22 | |
parent | 4fe0039dbe010b329872fc05f7a949141c6136ee (diff) |
Intercept RemoveAll RPC
This RPC is unusual in that it needs to erase a lot of repositories in
bulk. We are not expecting to route and replicate the changes as we
would for a typical repository operation. To accommodate this we
intercept the call in order to bulk update the database.
-rw-r--r-- | internal/praefect/remove_all.go | 48 | ||||
-rw-r--r-- | internal/praefect/remove_all_test.go | 106 | ||||
-rw-r--r-- | internal/praefect/server.go | 3 |
3 files changed, 156 insertions, 1 deletions
diff --git a/internal/praefect/remove_all.go b/internal/praefect/remove_all.go new file mode 100644 index 000000000..89dfff442 --- /dev/null +++ b/internal/praefect/remove_all.go @@ -0,0 +1,48 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +// RemoveAllHandler intercepts RemoveAll calls, deletes the database records +// before calling each gitaly. +func RemoveAllHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler { + return func(_ interface{}, stream grpc.ServerStream) error { + var req gitalypb.RemoveAllRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + ctx := stream.Context() + virtualStorage := req.StorageName + + if err := rs.DeleteAllRepositories(ctx, virtualStorage); err != nil { + return fmt.Errorf("delete all db repositories: %w", err) + } + + group, ctx := errgroup.WithContext(ctx) + + for storage, conn := range conns[virtualStorage] { + rewrittenStorage := storage + conn := conn + + group.Go(func() error { + _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveAll(ctx, &gitalypb.RemoveAllRequest{ + StorageName: rewrittenStorage, + }) + return err + }) + } + + if err := group.Wait(); err != nil { + return err + } + + return stream.SendMsg(&gitalypb.RemoveAllResponse{}) + } +} diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go new file mode 100644 index 000000000..7b7d43586 --- /dev/null +++ b/internal/praefect/remove_all_test.go @@ -0,0 +1,106 @@ +//go:build !gitaly_test_sha256 + +package praefect + +import ( + "context" + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/setup" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy" + "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +func TestRemoveAllHandler(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + + const virtualStorage, relativePath = "virtual-storage", "relative-path" + + errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly") + db := testdb.New(t) + + const gitaly1Storage = "gitaly-1" + gitaly1Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly1Storage)) + gitaly1RepoPath := filepath.Join(gitaly1Cfg.Storages[0].Path, relativePath) + gitaly1Addr := testserver.RunGitalyServer(t, gitaly1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + const gitaly2Storage = "gitaly-2" + gitaly2Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly2Storage)) + gitaly2RepoPath := filepath.Join(gitaly2Cfg.Storages[0].Path, relativePath) + gitaly2Addr := testserver.RunGitalyServer(t, gitaly2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect()) + + cfg := config.Config{VirtualStorages: []*config.VirtualStorage{ + { + Name: virtualStorage, + Nodes: []*config.Node{ + {Storage: gitaly1Storage, Address: gitaly1Addr}, + {Storage: gitaly2Storage, Address: gitaly2Addr}, + }, + }, + }} + + for _, repoPath := range []string{gitaly1RepoPath, gitaly2RepoPath} { + gittest.Exec(t, gitaly1Cfg, "init", "--bare", repoPath) + } + + rs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames()) + + tmp := testhelper.TempDir(t) + + ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) + require.NoError(t, err) + + nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, nil, nil, nil, nil) + require.NoError(t, err) + defer nodeSet.Close() + + srv := NewGRPCServer( + config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, + testhelper.NewDiscardingLogEntry(t), + protoregistry.GitalyProtoPreregistered, + func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) { + return nil, errServedByGitaly + }, + nil, + rs, + nil, + nodeSet.Connections(), + nil, + nil, + nil, + ) + defer srv.Stop() + + go testhelper.MustServe(t, srv, ln) + + clientConn, err := grpc.DialContext(ctx, "unix:"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer clientConn.Close() + + client := gitalypb.NewRepositoryServiceClient(clientConn) + _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: &gitalypb.Repository{}}) + testhelper.RequireGrpcError(t, errServedByGitaly, err) + + resp, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: virtualStorage}) + require.NoError(t, err) + + testhelper.ProtoEqual(t, &gitalypb.RemoveAllResponse{}, resp) + require.NoDirExists(t, gitaly1RepoPath) + require.NoDirExists(t, gitaly2RepoPath) +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index f85c85bac..8c46f6d2d 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -153,9 +153,10 @@ func NewGRPCServer( if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository { proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{ - "RepositoryExists": RepositoryExistsHandler(rs), + "RemoveAll": RemoveAllHandler(rs, conns), "RemoveRepository": RemoveRepositoryHandler(rs, conns), "RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs), + "RepositoryExists": RepositoryExistsHandler(rs), }) proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{ "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns), |