Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Fargher <jfargher@gitlab.com>2023-01-27 02:28:31 +0300
committerJames Fargher <jfargher@gitlab.com>2023-02-10 00:29:14 +0300
commita4acadc5e16ef6c37a807aac6466051a30ec6846 (patch)
treef7766f284553db04a274d5e045957ab2c18e8c22
parent4fe0039dbe010b329872fc05f7a949141c6136ee (diff)
Intercept RemoveAll RPC
This RPC is unusual in that it needs to erase a lot of repositories in bulk. We are not expecting to route and replicate the changes as we would for a typical repository operation. To accommodate this we intercept the call in order to bulk update the database.
-rw-r--r--internal/praefect/remove_all.go48
-rw-r--r--internal/praefect/remove_all_test.go106
-rw-r--r--internal/praefect/server.go3
3 files changed, 156 insertions, 1 deletions
diff --git a/internal/praefect/remove_all.go b/internal/praefect/remove_all.go
new file mode 100644
index 000000000..89dfff442
--- /dev/null
+++ b/internal/praefect/remove_all.go
@@ -0,0 +1,48 @@
+package praefect
+
+import (
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ "golang.org/x/sync/errgroup"
+ "google.golang.org/grpc"
+)
+
+// RemoveAllHandler intercepts RemoveAll calls, deletes the database records
+// before calling each gitaly.
+func RemoveAllHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
+ return func(_ interface{}, stream grpc.ServerStream) error {
+ var req gitalypb.RemoveAllRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ ctx := stream.Context()
+ virtualStorage := req.StorageName
+
+ if err := rs.DeleteAllRepositories(ctx, virtualStorage); err != nil {
+ return fmt.Errorf("delete all db repositories: %w", err)
+ }
+
+ group, ctx := errgroup.WithContext(ctx)
+
+ for storage, conn := range conns[virtualStorage] {
+ rewrittenStorage := storage
+ conn := conn
+
+ group.Go(func() error {
+ _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveAll(ctx, &gitalypb.RemoveAllRequest{
+ StorageName: rewrittenStorage,
+ })
+ return err
+ })
+ }
+
+ if err := group.Wait(); err != nil {
+ return err
+ }
+
+ return stream.SendMsg(&gitalypb.RemoveAllResponse{})
+ }
+}
diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go
new file mode 100644
index 000000000..7b7d43586
--- /dev/null
+++ b/internal/praefect/remove_all_test.go
@@ -0,0 +1,106 @@
+//go:build !gitaly_test_sha256
+
+package praefect
+
+import (
+ "context"
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/service/setup"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testcfg"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper/testserver"
+ "gitlab.com/gitlab-org/gitaly/v15/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/status"
+)
+
+func TestRemoveAllHandler(t *testing.T) {
+ t.Parallel()
+ ctx := testhelper.Context(t)
+
+ const virtualStorage, relativePath = "virtual-storage", "relative-path"
+
+ errServedByGitaly := status.Error(codes.Unknown, "request passed to Gitaly")
+ db := testdb.New(t)
+
+ const gitaly1Storage = "gitaly-1"
+ gitaly1Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly1Storage))
+ gitaly1RepoPath := filepath.Join(gitaly1Cfg.Storages[0].Path, relativePath)
+ gitaly1Addr := testserver.RunGitalyServer(t, gitaly1Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ const gitaly2Storage = "gitaly-2"
+ gitaly2Cfg := testcfg.Build(t, testcfg.WithStorages(gitaly2Storage))
+ gitaly2RepoPath := filepath.Join(gitaly2Cfg.Storages[0].Path, relativePath)
+ gitaly2Addr := testserver.RunGitalyServer(t, gitaly2Cfg, nil, setup.RegisterAll, testserver.WithDisablePraefect())
+
+ cfg := config.Config{VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: virtualStorage,
+ Nodes: []*config.Node{
+ {Storage: gitaly1Storage, Address: gitaly1Addr},
+ {Storage: gitaly2Storage, Address: gitaly2Addr},
+ },
+ },
+ }}
+
+ for _, repoPath := range []string{gitaly1RepoPath, gitaly2RepoPath} {
+ gittest.Exec(t, gitaly1Cfg, "init", "--bare", repoPath)
+ }
+
+ rs := datastore.NewPostgresRepositoryStore(db, cfg.StorageNames())
+
+ tmp := testhelper.TempDir(t)
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "praefect"))
+ require.NoError(t, err)
+
+ nodeSet, err := DialNodes(ctx, cfg.VirtualStorages, nil, nil, nil, nil)
+ require.NoError(t, err)
+ defer nodeSet.Close()
+
+ srv := NewGRPCServer(
+ config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}},
+ testhelper.NewDiscardingLogEntry(t),
+ protoregistry.GitalyProtoPreregistered,
+ func(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (*proxy.StreamParameters, error) {
+ return nil, errServedByGitaly
+ },
+ nil,
+ rs,
+ nil,
+ nodeSet.Connections(),
+ nil,
+ nil,
+ nil,
+ )
+ defer srv.Stop()
+
+ go testhelper.MustServe(t, srv, ln)
+
+ clientConn, err := grpc.DialContext(ctx, "unix:"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
+ require.NoError(t, err)
+ defer clientConn.Close()
+
+ client := gitalypb.NewRepositoryServiceClient(clientConn)
+ _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: &gitalypb.Repository{}})
+ testhelper.RequireGrpcError(t, errServedByGitaly, err)
+
+ resp, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: virtualStorage})
+ require.NoError(t, err)
+
+ testhelper.ProtoEqual(t, &gitalypb.RemoveAllResponse{}, resp)
+ require.NoDirExists(t, gitaly1RepoPath)
+ require.NoDirExists(t, gitaly2RepoPath)
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index f85c85bac..8c46f6d2d 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -153,9 +153,10 @@ func NewGRPCServer(
if conf.Failover.ElectionStrategy == config.ElectionStrategyPerRepository {
proxy.RegisterStreamHandlers(srv, "gitaly.RepositoryService", map[string]grpc.StreamHandler{
- "RepositoryExists": RepositoryExistsHandler(rs),
+ "RemoveAll": RemoveAllHandler(rs, conns),
"RemoveRepository": RemoveRepositoryHandler(rs, conns),
"RenameRepository": RenameRepositoryHandler(conf.VirtualStorageNames(), rs),
+ "RepositoryExists": RepositoryExistsHandler(rs),
})
proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
"DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),