Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-04-07 13:54:25 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-07 13:54:25 +0300
commit7f7b3c14e36f799090baf48857dcd5ba019bbbc6 (patch)
treede19f25acb666d37b2b22d826d78391c1119fe83
parent23efb30243f15dd8c29e222ba0552a21af9b1cbe (diff)
parentf6d989353d8180abf2559c7212883980f8fad830 (diff)
Merge branch 'smh-delete-object-pool-type' into 'master'
Handle DeleteObjectPool calls in Praefect Closes #3742 and #4078 See merge request gitlab-org/gitaly!4395
-rw-r--r--internal/git/housekeeping/object_pool.go5
-rw-r--r--internal/gitaly/service/objectpool/create.go20
-rw-r--r--internal/gitaly/service/objectpool/create_test.go39
-rw-r--r--internal/gitaly/service/objectpool/testhelper_test.go14
-rw-r--r--internal/gitaly/service/repository/remove_test.go5
-rw-r--r--internal/praefect/delete_object_pool.go49
-rw-r--r--internal/praefect/delete_object_pool_test.go114
-rw-r--r--internal/praefect/remove_repository.go76
-rw-r--r--internal/praefect/server.go3
9 files changed, 277 insertions, 48 deletions
diff --git a/internal/git/housekeeping/object_pool.go b/internal/git/housekeeping/object_pool.go
index f970757de..1e4a935c8 100644
--- a/internal/git/housekeeping/object_pool.go
+++ b/internal/git/housekeeping/object_pool.go
@@ -8,7 +8,8 @@ import (
// railsPoolDirRegexp is used to validate object pool directory structure and name as generated by Rails.
var railsPoolDirRegexp = regexp.MustCompile(`^@pools/([0-9a-f]{2})/([0-9a-f]{2})/([0-9a-f]{64})\.git$`)
-func isRailsPoolPath(relativePath string) bool {
+// IsRailsPoolPath returns whether the relative path indicates this is a pool path generated by Rails.
+func IsRailsPoolPath(relativePath string) bool {
matches := railsPoolDirRegexp.FindStringSubmatch(relativePath)
if matches == nil || !strings.HasPrefix(matches[3], matches[1]+matches[2]) {
return false
@@ -20,5 +21,5 @@ func isRailsPoolPath(relativePath string) bool {
// IsPoolPath returns whether the relative path indicates the repository is an object
// pool.
func IsPoolPath(relativePath string) bool {
- return isRailsPoolPath(relativePath)
+ return IsRailsPoolPath(relativePath)
}
diff --git a/internal/gitaly/service/objectpool/create.go b/internal/gitaly/service/objectpool/create.go
index 1e97f8c88..2dfd35f0a 100644
--- a/internal/gitaly/service/objectpool/create.go
+++ b/internal/gitaly/service/objectpool/create.go
@@ -57,18 +57,28 @@ func (s *server) DeleteObjectPool(ctx context.Context, in *gitalypb.DeleteObject
return &gitalypb.DeleteObjectPoolResponse{}, nil
}
-type poolRequest interface {
+// PoolRequest is the interface of a gRPC request that carries an object pool.
+type PoolRequest interface {
GetObjectPool() *gitalypb.ObjectPool
}
-func (s *server) poolForRequest(req poolRequest) (*objectpool.ObjectPool, error) {
- reqPool := req.GetObjectPool()
-
- poolRepo := reqPool.GetRepository()
+// ExtractPool returns the pool repository from the request or an error if the
+// request did no contain a pool.
+func ExtractPool(req PoolRequest) (*gitalypb.Repository, error) {
+ poolRepo := req.GetObjectPool().GetRepository()
if poolRepo == nil {
return nil, errMissingPool
}
+ return poolRepo, nil
+}
+
+func (s *server) poolForRequest(req PoolRequest) (*objectpool.ObjectPool, error) {
+ poolRepo, err := ExtractPool(req)
+ if err != nil {
+ return nil, err
+ }
+
pool, err := objectpool.NewObjectPool(s.locator, s.gitCmdFactory, s.catfileCache, s.txManager, s.housekeepingManager, poolRepo.GetStorageName(), poolRepo.GetRelativePath())
if err != nil {
if err == objectpool.ErrInvalidPoolDir {
diff --git a/internal/gitaly/service/objectpool/create_test.go b/internal/gitaly/service/objectpool/create_test.go
index f7dc13371..c8b77063c 100644
--- a/internal/gitaly/service/objectpool/create_test.go
+++ b/internal/gitaly/service/objectpool/create_test.go
@@ -1,7 +1,6 @@
package objectpool
import (
- "fmt"
"path/filepath"
"strings"
"testing"
@@ -10,8 +9,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -143,6 +140,8 @@ func TestDelete(t *testing.T) {
cfg, repoProto, _, _, client := setup(ctx, t)
repo := localrepo.NewTestRepo(t, cfg, repoProto)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(extractConn(client))
+
pool := initObjectPool(t, cfg, cfg.Storages[0])
_, err := client.CreateObjectPool(ctx, &gitalypb.CreateObjectPoolRequest{
ObjectPool: pool.ToProto(),
@@ -154,10 +153,21 @@ func TestDelete(t *testing.T) {
for _, tc := range []struct {
desc string
+ noPool bool
relativePath string
error error
}{
{
+ desc: "no pool in request fails",
+ noPool: true,
+ error: errMissingPool,
+ },
+ {
+ desc: "deleting outside pools directory fails",
+ relativePath: ".",
+ error: errInvalidPoolDir,
+ },
+ {
desc: "deleting outside pools directory fails",
relativePath: ".",
error: errInvalidPoolDir,
@@ -197,22 +207,25 @@ func TestDelete(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
- _, err := client.DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{ObjectPool: &gitalypb.ObjectPool{
+ request := &gitalypb.DeleteObjectPoolRequest{ObjectPool: &gitalypb.ObjectPool{
Repository: &gitalypb.Repository{
StorageName: repo.GetStorageName(),
RelativePath: tc.relativePath,
},
- }})
-
- expectedErr := tc.error
- if tc.error == errInvalidPoolDir && testhelper.IsPraefectEnabled() {
- expectedErr = helper.ErrNotFound(fmt.Errorf(
- "mutator call: route repository mutator: get repository id: %w",
- commonerr.NewRepositoryNotFoundError(repo.GetStorageName(), tc.relativePath),
- ))
+ }}
+
+ if tc.noPool {
+ request.ObjectPool = nil
}
- testhelper.RequireGrpcError(t, expectedErr, err)
+ _, err := client.DeleteObjectPool(ctx, request)
+ testhelper.RequireGrpcError(t, tc.error, err)
+
+ response, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
+ Repository: pool.ToProto().GetRepository(),
+ })
+ require.NoError(t, err)
+ require.Equal(t, tc.error != nil, response.GetExists())
})
}
}
diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go
index f0ba73dd8..3ea7f981c 100644
--- a/internal/gitaly/service/objectpool/testhelper_test.go
+++ b/internal/gitaly/service/objectpool/testhelper_test.go
@@ -30,6 +30,18 @@ func TestMain(m *testing.M) {
testhelper.Run(m)
}
+// clientWithConn allows for passing through the ClientConn to tests which need
+// to access other services than ObjectPoolService.
+type clientWithConn struct {
+ gitalypb.ObjectPoolServiceClient
+ conn *grpc.ClientConn
+}
+
+// extractConn returns the underlying ClientConn from the client.
+func extractConn(client gitalypb.ObjectPoolServiceClient) *grpc.ClientConn {
+ return client.(clientWithConn).conn
+}
+
func setup(ctx context.Context, t *testing.T, opts ...testserver.GitalyServerOpt) (config.Cfg, *gitalypb.Repository, string, storage.Locator, gitalypb.ObjectPoolServiceClient) {
t.Helper()
@@ -48,7 +60,7 @@ func setup(ctx context.Context, t *testing.T, opts ...testserver.GitalyServerOpt
Seed: gittest.SeedGitLabTest,
})
- return cfg, repo, repoPath, locator, gitalypb.NewObjectPoolServiceClient(conn)
+ return cfg, repo, repoPath, locator, clientWithConn{ObjectPoolServiceClient: gitalypb.NewObjectPoolServiceClient(conn), conn: conn}
}
func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, logger *logrus.Logger, opts ...testserver.GitalyServerOpt) string {
diff --git a/internal/gitaly/service/repository/remove_test.go b/internal/gitaly/service/repository/remove_test.go
index 962b6e0b8..b6c922ecb 100644
--- a/internal/gitaly/service/repository/remove_test.go
+++ b/internal/gitaly/service/repository/remove_test.go
@@ -41,8 +41,7 @@ func TestRemoveRepository_doesNotExist(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
- // Praefect special-cases repository removals, so we disable Praefect here.
- cfg, client := setupRepositoryServiceWithoutRepo(t, testserver.WithDisablePraefect())
+ cfg, client := setupRepositoryServiceWithoutRepo(t)
_, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
Repository: &gitalypb.Repository{StorageName: cfg.Storages[0].Name, RelativePath: "/does/not/exist"},
@@ -54,7 +53,7 @@ func TestRemoveRepository_locking(t *testing.T) {
t.Parallel()
ctx := testhelper.Context(t)
- // Praefect special-cases repository removals, so we disable Praefect here.
+ // Praefect does not acquire a lock on repository deletion so disable the test case for Praefect.
_, repo, repoPath, client := setupRepositoryService(ctx, t, testserver.WithDisablePraefect())
// Simulate a concurrent RPC holding the repository lock.
diff --git a/internal/praefect/delete_object_pool.go b/internal/praefect/delete_object_pool.go
new file mode 100644
index 000000000..b31ba40c3
--- /dev/null
+++ b/internal/praefect/delete_object_pool.go
@@ -0,0 +1,49 @@
+package praefect
+
+import (
+ "context"
+ "fmt"
+
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool"
+ objectpoolsvc "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/objectpool"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/proto"
+)
+
+// DeleteObjectPoolHandler intercepts DeleteObjectPool calls, deletes the database records and
+// deletes the object pool from every backing Gitaly node.
+func DeleteObjectPoolHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
+ return removeRepositoryHandler(rs, conns,
+ func(stream grpc.ServerStream) (*gitalypb.Repository, error) {
+ var req gitalypb.DeleteObjectPoolRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return nil, fmt.Errorf("receive request: %w", err)
+ }
+
+ repo, err := objectpoolsvc.ExtractPool(&req)
+ if err != nil {
+ return nil, err
+ }
+
+ if !housekeeping.IsRailsPoolPath(repo.GetRelativePath()) {
+ return nil, helper.ErrInvalidArgument(objectpool.ErrInvalidPoolDir)
+ }
+
+ return repo, nil
+ },
+ func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error {
+ _, err := gitalypb.NewObjectPoolServiceClient(conn).DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: rewritten,
+ },
+ })
+ return err
+ },
+ func() proto.Message { return &gitalypb.DeleteObjectPoolResponse{} },
+ false,
+ )
+}
diff --git a/internal/praefect/delete_object_pool_test.go b/internal/praefect/delete_object_pool_test.go
new file mode 100644
index 000000000..2a8c46f64
--- /dev/null
+++ b/internal/praefect/delete_object_pool_test.go
@@ -0,0 +1,114 @@
+package praefect
+
+import (
+ "context"
+ "testing"
+
+ grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+type mockObjectPoolService struct {
+ gitalypb.UnimplementedObjectPoolServiceServer
+ deleteObjectPoolFunc func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error)
+}
+
+func (m mockObjectPoolService) DeleteObjectPool(ctx context.Context, req *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) {
+ return m.deleteObjectPoolFunc(ctx, req)
+}
+
+func TestDeleteObjectPoolHandler(t *testing.T) {
+ // the primary returns a successful response
+ primarySrv := grpc.NewServer()
+ gitalypb.RegisterObjectPoolServiceServer(primarySrv, mockObjectPoolService{
+ deleteObjectPoolFunc: func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) {
+ return &gitalypb.DeleteObjectPoolResponse{}, nil
+ },
+ })
+
+ // the secondary fails as it doesn't have the service registered
+ secondarySrv := grpc.NewServer()
+
+ primaryLn, primaryAddr := testhelper.GetLocalhostListener(t)
+ secondaryLn, secondaryAddr := testhelper.GetLocalhostListener(t)
+
+ defer primarySrv.Stop()
+ go primarySrv.Serve(primaryLn)
+
+ defer secondarySrv.Stop()
+ go secondarySrv.Serve(secondaryLn)
+
+ db := testdb.New(t)
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
+
+ ctx := testhelper.Context(t)
+
+ repo := &gitalypb.Repository{
+ StorageName: "virtual-storage",
+ RelativePath: gittest.NewObjectPoolName(t),
+ }
+
+ require.NoError(t,
+ rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, "replica-path", "primary", []string{"secondary", "unconfigured_storage"}, nil, true, true),
+ )
+
+ primaryConn, err := grpc.Dial(primaryAddr, grpc.WithInsecure())
+ require.NoError(t, err)
+ defer primaryConn.Close()
+
+ secondaryConn, err := grpc.Dial(secondaryAddr, grpc.WithInsecure())
+ require.NoError(t, err)
+ defer secondaryConn.Close()
+
+ praefectLn, praefectAddr := testhelper.GetLocalhostListener(t)
+ logger, hook := test.NewNullLogger()
+ praefectSrv := grpc.NewServer(grpc.ChainStreamInterceptor(
+ grpcmwlogrus.StreamServerInterceptor(logrus.NewEntry(logger), grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat)),
+ ))
+ praefectSrv.RegisterService(&grpc.ServiceDesc{
+ ServiceName: "gitaly.ObjectPoolService",
+ HandlerType: (*interface{})(nil),
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "DeleteObjectPool",
+ Handler: DeleteObjectPoolHandler(rs, Connections{
+ "virtual-storage": {
+ "primary": primaryConn,
+ "secondary": secondaryConn,
+ },
+ }),
+ ServerStreams: true,
+ ClientStreams: true,
+ },
+ },
+ }, struct{}{})
+
+ defer praefectSrv.Stop()
+ go praefectSrv.Serve(praefectLn)
+
+ praefectConn, err := grpc.Dial(praefectAddr, grpc.WithInsecure())
+ require.NoError(t, err)
+ defer praefectConn.Close()
+
+ _, err = gitalypb.NewObjectPoolServiceClient(
+ praefectConn,
+ ).DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{Repository: repo},
+ })
+ require.NoError(t, err)
+
+ require.Equal(t, 2, len(hook.Entries), "expected a log entry for failed deletion")
+ require.Equal(t, "failed deleting repository", hook.Entries[0].Message)
+ require.Equal(t, repo.StorageName, hook.Entries[0].Data["virtual_storage"])
+ require.Equal(t, repo.RelativePath, hook.Entries[0].Data["relative_path"])
+ require.Equal(t, "secondary", hook.Entries[0].Data["storage"])
+}
diff --git a/internal/praefect/remove_repository.go b/internal/praefect/remove_repository.go
index 62d92e5a2..74a033ef8 100644
--- a/internal/praefect/remove_repository.go
+++ b/internal/praefect/remove_repository.go
@@ -20,25 +20,57 @@ import (
// RemoveRepositoryHandler intercepts RemoveRepository calls, deletes the database records and
// deletes the repository from every backing Gitaly node.
func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
- return func(srv interface{}, stream grpc.ServerStream) error {
- var req gitalypb.RemoveRepositoryRequest
- if err := stream.RecvMsg(&req); err != nil {
- return fmt.Errorf("receive request: %w", err)
+ return removeRepositoryHandler(rs, conns,
+ func(stream grpc.ServerStream) (*gitalypb.Repository, error) {
+ var req gitalypb.RemoveRepositoryRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return nil, fmt.Errorf("receive request: %w", err)
+ }
+
+ repo := req.GetRepository()
+ if repo == nil {
+ return nil, errMissingRepository
+ }
+
+ return repo, nil
+ },
+ func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error {
+ _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ Repository: rewritten,
+ })
+ return err
+ },
+ func() proto.Message { return &gitalypb.RemoveRepositoryResponse{} },
+ true,
+ )
+}
+
+type requestParser func(grpc.ServerStream) (*gitalypb.Repository, error)
+
+type requestProxier func(context.Context, *grpc.ClientConn, *gitalypb.Repository) error
+
+type responseFactory func() proto.Message
+
+func removeRepositoryHandler(rs datastore.RepositoryStore, conns Connections, parseRequest requestParser, proxyRequest requestProxier, buildResponse responseFactory, errorOnNotFound bool) grpc.StreamHandler {
+ return func(_ interface{}, stream grpc.ServerStream) error {
+ repo, err := parseRequest(stream)
+ if err != nil {
+ return err
}
ctx := stream.Context()
- repo := req.GetRepository()
- if repo == nil {
- return errMissingRepository
- }
virtualStorage := repo.StorageName
replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath)
if err != nil {
- // Gitaly doesn't return an error if the repository is not found, so Praefect follows the
- // same protocol.
if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
- return helper.ErrNotFoundf("repository does not exist")
+ if errorOnNotFound {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return helper.ErrNotFoundf("repository does not exist")
+ }
+ }
+
+ return stream.SendMsg(buildResponse())
}
return fmt.Errorf("delete repository: %w", err)
@@ -46,19 +78,15 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr
var wg sync.WaitGroup
- storageSet := make(map[string]struct{}, len(storages))
- for _, storage := range storages {
- storageSet[storage] = struct{}{}
- }
-
// It's not critical these deletions complete as the background crawler will identify these repos as deleted.
// To rather return a successful code to the client, we limit the timeout here to 10s.
ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second)
defer cancel()
- for storage, conn := range conns[virtualStorage] {
- if _, ok := storageSet[storage]; !ok {
- // There may be database records for replicas which exist on storages that are not configured in the
+ for _, storage := range storages {
+ conn, ok := conns[virtualStorage][storage]
+ if !ok {
+ // There may be database records for object pools which exist on storages that are not configured in the
// local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled
// by the background cleaner like any other stale repository if the storages are returned to the configuration.
continue
@@ -68,11 +96,11 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr
go func(rewrittenStorage string, conn *grpc.ClientConn) {
defer wg.Done()
- req := proto.Clone(&req).(*gitalypb.RemoveRepositoryRequest)
- req.Repository.StorageName = rewrittenStorage
- req.Repository.RelativePath = replicaPath
+ rewritten := proto.Clone(repo).(*gitalypb.Repository)
+ rewritten.StorageName = rewrittenStorage
+ rewritten.RelativePath = replicaPath
- if _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, req); err != nil {
+ if err := proxyRequest(ctx, conn, rewritten); err != nil {
ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
"virtual_storage": virtualStorage,
"relative_path": repo.RelativePath,
@@ -84,6 +112,6 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr
wg.Wait()
- return stream.SendMsg(&gitalypb.RemoveRepositoryResponse{})
+ return stream.SendMsg(buildResponse())
}
}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index c1087da28..d75810246 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -149,6 +149,9 @@ func NewGRPCServer(
"RepositoryExists": RepositoryExistsHandler(rs),
"RemoveRepository": RemoveRepositoryHandler(rs, conns),
})
+ proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
+ "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),
+ })
}
return srv