diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-07 13:54:25 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-07 13:54:25 +0300 |
commit | 7f7b3c14e36f799090baf48857dcd5ba019bbbc6 (patch) | |
tree | de19f25acb666d37b2b22d826d78391c1119fe83 | |
parent | 23efb30243f15dd8c29e222ba0552a21af9b1cbe (diff) | |
parent | f6d989353d8180abf2559c7212883980f8fad830 (diff) |
Merge branch 'smh-delete-object-pool-type' into 'master'
Handle DeleteObjectPool calls in Praefect
Closes #3742 and #4078
See merge request gitlab-org/gitaly!4395
-rw-r--r-- | internal/git/housekeeping/object_pool.go | 5 | ||||
-rw-r--r-- | internal/gitaly/service/objectpool/create.go | 20 | ||||
-rw-r--r-- | internal/gitaly/service/objectpool/create_test.go | 39 | ||||
-rw-r--r-- | internal/gitaly/service/objectpool/testhelper_test.go | 14 | ||||
-rw-r--r-- | internal/gitaly/service/repository/remove_test.go | 5 | ||||
-rw-r--r-- | internal/praefect/delete_object_pool.go | 49 | ||||
-rw-r--r-- | internal/praefect/delete_object_pool_test.go | 114 | ||||
-rw-r--r-- | internal/praefect/remove_repository.go | 76 | ||||
-rw-r--r-- | internal/praefect/server.go | 3 |
9 files changed, 277 insertions, 48 deletions
diff --git a/internal/git/housekeeping/object_pool.go b/internal/git/housekeeping/object_pool.go index f970757de..1e4a935c8 100644 --- a/internal/git/housekeeping/object_pool.go +++ b/internal/git/housekeeping/object_pool.go @@ -8,7 +8,8 @@ import ( // railsPoolDirRegexp is used to validate object pool directory structure and name as generated by Rails. var railsPoolDirRegexp = regexp.MustCompile(`^@pools/([0-9a-f]{2})/([0-9a-f]{2})/([0-9a-f]{64})\.git$`) -func isRailsPoolPath(relativePath string) bool { +// IsRailsPoolPath returns whether the relative path indicates this is a pool path generated by Rails. +func IsRailsPoolPath(relativePath string) bool { matches := railsPoolDirRegexp.FindStringSubmatch(relativePath) if matches == nil || !strings.HasPrefix(matches[3], matches[1]+matches[2]) { return false @@ -20,5 +21,5 @@ func isRailsPoolPath(relativePath string) bool { // IsPoolPath returns whether the relative path indicates the repository is an object // pool. func IsPoolPath(relativePath string) bool { - return isRailsPoolPath(relativePath) + return IsRailsPoolPath(relativePath) } diff --git a/internal/gitaly/service/objectpool/create.go b/internal/gitaly/service/objectpool/create.go index 1e97f8c88..2dfd35f0a 100644 --- a/internal/gitaly/service/objectpool/create.go +++ b/internal/gitaly/service/objectpool/create.go @@ -57,18 +57,28 @@ func (s *server) DeleteObjectPool(ctx context.Context, in *gitalypb.DeleteObject return &gitalypb.DeleteObjectPoolResponse{}, nil } -type poolRequest interface { +// PoolRequest is the interface of a gRPC request that carries an object pool. +type PoolRequest interface { GetObjectPool() *gitalypb.ObjectPool } -func (s *server) poolForRequest(req poolRequest) (*objectpool.ObjectPool, error) { - reqPool := req.GetObjectPool() - - poolRepo := reqPool.GetRepository() +// ExtractPool returns the pool repository from the request or an error if the +// request did no contain a pool. +func ExtractPool(req PoolRequest) (*gitalypb.Repository, error) { + poolRepo := req.GetObjectPool().GetRepository() if poolRepo == nil { return nil, errMissingPool } + return poolRepo, nil +} + +func (s *server) poolForRequest(req PoolRequest) (*objectpool.ObjectPool, error) { + poolRepo, err := ExtractPool(req) + if err != nil { + return nil, err + } + pool, err := objectpool.NewObjectPool(s.locator, s.gitCmdFactory, s.catfileCache, s.txManager, s.housekeepingManager, poolRepo.GetStorageName(), poolRepo.GetRelativePath()) if err != nil { if err == objectpool.ErrInvalidPoolDir { diff --git a/internal/gitaly/service/objectpool/create_test.go b/internal/gitaly/service/objectpool/create_test.go index f7dc13371..c8b77063c 100644 --- a/internal/gitaly/service/objectpool/create_test.go +++ b/internal/gitaly/service/objectpool/create_test.go @@ -1,7 +1,6 @@ package objectpool import ( - "fmt" "path/filepath" "strings" "testing" @@ -10,8 +9,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -143,6 +140,8 @@ func TestDelete(t *testing.T) { cfg, repoProto, _, _, client := setup(ctx, t) repo := localrepo.NewTestRepo(t, cfg, repoProto) + repositoryClient := gitalypb.NewRepositoryServiceClient(extractConn(client)) + pool := initObjectPool(t, cfg, cfg.Storages[0]) _, err := client.CreateObjectPool(ctx, &gitalypb.CreateObjectPoolRequest{ ObjectPool: pool.ToProto(), @@ -154,10 +153,21 @@ func TestDelete(t *testing.T) { for _, tc := range []struct { desc string + noPool bool relativePath string error error }{ { + desc: "no pool in request fails", + noPool: true, + error: errMissingPool, + }, + { + desc: "deleting outside pools directory fails", + relativePath: ".", + error: errInvalidPoolDir, + }, + { desc: "deleting outside pools directory fails", relativePath: ".", error: errInvalidPoolDir, @@ -197,22 +207,25 @@ func TestDelete(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - _, err := client.DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{ObjectPool: &gitalypb.ObjectPool{ + request := &gitalypb.DeleteObjectPoolRequest{ObjectPool: &gitalypb.ObjectPool{ Repository: &gitalypb.Repository{ StorageName: repo.GetStorageName(), RelativePath: tc.relativePath, }, - }}) - - expectedErr := tc.error - if tc.error == errInvalidPoolDir && testhelper.IsPraefectEnabled() { - expectedErr = helper.ErrNotFound(fmt.Errorf( - "mutator call: route repository mutator: get repository id: %w", - commonerr.NewRepositoryNotFoundError(repo.GetStorageName(), tc.relativePath), - )) + }} + + if tc.noPool { + request.ObjectPool = nil } - testhelper.RequireGrpcError(t, expectedErr, err) + _, err := client.DeleteObjectPool(ctx, request) + testhelper.RequireGrpcError(t, tc.error, err) + + response, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: pool.ToProto().GetRepository(), + }) + require.NoError(t, err) + require.Equal(t, tc.error != nil, response.GetExists()) }) } } diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index f0ba73dd8..3ea7f981c 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -30,6 +30,18 @@ func TestMain(m *testing.M) { testhelper.Run(m) } +// clientWithConn allows for passing through the ClientConn to tests which need +// to access other services than ObjectPoolService. +type clientWithConn struct { + gitalypb.ObjectPoolServiceClient + conn *grpc.ClientConn +} + +// extractConn returns the underlying ClientConn from the client. +func extractConn(client gitalypb.ObjectPoolServiceClient) *grpc.ClientConn { + return client.(clientWithConn).conn +} + func setup(ctx context.Context, t *testing.T, opts ...testserver.GitalyServerOpt) (config.Cfg, *gitalypb.Repository, string, storage.Locator, gitalypb.ObjectPoolServiceClient) { t.Helper() @@ -48,7 +60,7 @@ func setup(ctx context.Context, t *testing.T, opts ...testserver.GitalyServerOpt Seed: gittest.SeedGitLabTest, }) - return cfg, repo, repoPath, locator, gitalypb.NewObjectPoolServiceClient(conn) + return cfg, repo, repoPath, locator, clientWithConn{ObjectPoolServiceClient: gitalypb.NewObjectPoolServiceClient(conn), conn: conn} } func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, logger *logrus.Logger, opts ...testserver.GitalyServerOpt) string { diff --git a/internal/gitaly/service/repository/remove_test.go b/internal/gitaly/service/repository/remove_test.go index 962b6e0b8..b6c922ecb 100644 --- a/internal/gitaly/service/repository/remove_test.go +++ b/internal/gitaly/service/repository/remove_test.go @@ -41,8 +41,7 @@ func TestRemoveRepository_doesNotExist(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) - // Praefect special-cases repository removals, so we disable Praefect here. - cfg, client := setupRepositoryServiceWithoutRepo(t, testserver.WithDisablePraefect()) + cfg, client := setupRepositoryServiceWithoutRepo(t) _, err := client.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ Repository: &gitalypb.Repository{StorageName: cfg.Storages[0].Name, RelativePath: "/does/not/exist"}, @@ -54,7 +53,7 @@ func TestRemoveRepository_locking(t *testing.T) { t.Parallel() ctx := testhelper.Context(t) - // Praefect special-cases repository removals, so we disable Praefect here. + // Praefect does not acquire a lock on repository deletion so disable the test case for Praefect. _, repo, repoPath, client := setupRepositoryService(ctx, t, testserver.WithDisablePraefect()) // Simulate a concurrent RPC holding the repository lock. diff --git a/internal/praefect/delete_object_pool.go b/internal/praefect/delete_object_pool.go new file mode 100644 index 000000000..b31ba40c3 --- /dev/null +++ b/internal/praefect/delete_object_pool.go @@ -0,0 +1,49 @@ +package praefect + +import ( + "context" + "fmt" + + "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool" + objectpoolsvc "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/objectpool" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +// DeleteObjectPoolHandler intercepts DeleteObjectPool calls, deletes the database records and +// deletes the object pool from every backing Gitaly node. +func DeleteObjectPoolHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler { + return removeRepositoryHandler(rs, conns, + func(stream grpc.ServerStream) (*gitalypb.Repository, error) { + var req gitalypb.DeleteObjectPoolRequest + if err := stream.RecvMsg(&req); err != nil { + return nil, fmt.Errorf("receive request: %w", err) + } + + repo, err := objectpoolsvc.ExtractPool(&req) + if err != nil { + return nil, err + } + + if !housekeeping.IsRailsPoolPath(repo.GetRelativePath()) { + return nil, helper.ErrInvalidArgument(objectpool.ErrInvalidPoolDir) + } + + return repo, nil + }, + func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error { + _, err := gitalypb.NewObjectPoolServiceClient(conn).DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{ + ObjectPool: &gitalypb.ObjectPool{ + Repository: rewritten, + }, + }) + return err + }, + func() proto.Message { return &gitalypb.DeleteObjectPoolResponse{} }, + false, + ) +} diff --git a/internal/praefect/delete_object_pool_test.go b/internal/praefect/delete_object_pool_test.go new file mode 100644 index 000000000..2a8c46f64 --- /dev/null +++ b/internal/praefect/delete_object_pool_test.go @@ -0,0 +1,114 @@ +package praefect + +import ( + "context" + "testing" + + grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/log" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type mockObjectPoolService struct { + gitalypb.UnimplementedObjectPoolServiceServer + deleteObjectPoolFunc func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) +} + +func (m mockObjectPoolService) DeleteObjectPool(ctx context.Context, req *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) { + return m.deleteObjectPoolFunc(ctx, req) +} + +func TestDeleteObjectPoolHandler(t *testing.T) { + // the primary returns a successful response + primarySrv := grpc.NewServer() + gitalypb.RegisterObjectPoolServiceServer(primarySrv, mockObjectPoolService{ + deleteObjectPoolFunc: func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) { + return &gitalypb.DeleteObjectPoolResponse{}, nil + }, + }) + + // the secondary fails as it doesn't have the service registered + secondarySrv := grpc.NewServer() + + primaryLn, primaryAddr := testhelper.GetLocalhostListener(t) + secondaryLn, secondaryAddr := testhelper.GetLocalhostListener(t) + + defer primarySrv.Stop() + go primarySrv.Serve(primaryLn) + + defer secondarySrv.Stop() + go secondarySrv.Serve(secondaryLn) + + db := testdb.New(t) + rs := datastore.NewPostgresRepositoryStore(db, nil) + + ctx := testhelper.Context(t) + + repo := &gitalypb.Repository{ + StorageName: "virtual-storage", + RelativePath: gittest.NewObjectPoolName(t), + } + + require.NoError(t, + rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, "replica-path", "primary", []string{"secondary", "unconfigured_storage"}, nil, true, true), + ) + + primaryConn, err := grpc.Dial(primaryAddr, grpc.WithInsecure()) + require.NoError(t, err) + defer primaryConn.Close() + + secondaryConn, err := grpc.Dial(secondaryAddr, grpc.WithInsecure()) + require.NoError(t, err) + defer secondaryConn.Close() + + praefectLn, praefectAddr := testhelper.GetLocalhostListener(t) + logger, hook := test.NewNullLogger() + praefectSrv := grpc.NewServer(grpc.ChainStreamInterceptor( + grpcmwlogrus.StreamServerInterceptor(logrus.NewEntry(logger), grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat)), + )) + praefectSrv.RegisterService(&grpc.ServiceDesc{ + ServiceName: "gitaly.ObjectPoolService", + HandlerType: (*interface{})(nil), + Streams: []grpc.StreamDesc{ + { + StreamName: "DeleteObjectPool", + Handler: DeleteObjectPoolHandler(rs, Connections{ + "virtual-storage": { + "primary": primaryConn, + "secondary": secondaryConn, + }, + }), + ServerStreams: true, + ClientStreams: true, + }, + }, + }, struct{}{}) + + defer praefectSrv.Stop() + go praefectSrv.Serve(praefectLn) + + praefectConn, err := grpc.Dial(praefectAddr, grpc.WithInsecure()) + require.NoError(t, err) + defer praefectConn.Close() + + _, err = gitalypb.NewObjectPoolServiceClient( + praefectConn, + ).DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{ + ObjectPool: &gitalypb.ObjectPool{Repository: repo}, + }) + require.NoError(t, err) + + require.Equal(t, 2, len(hook.Entries), "expected a log entry for failed deletion") + require.Equal(t, "failed deleting repository", hook.Entries[0].Message) + require.Equal(t, repo.StorageName, hook.Entries[0].Data["virtual_storage"]) + require.Equal(t, repo.RelativePath, hook.Entries[0].Data["relative_path"]) + require.Equal(t, "secondary", hook.Entries[0].Data["storage"]) +} diff --git a/internal/praefect/remove_repository.go b/internal/praefect/remove_repository.go index 62d92e5a2..74a033ef8 100644 --- a/internal/praefect/remove_repository.go +++ b/internal/praefect/remove_repository.go @@ -20,25 +20,57 @@ import ( // RemoveRepositoryHandler intercepts RemoveRepository calls, deletes the database records and // deletes the repository from every backing Gitaly node. func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler { - return func(srv interface{}, stream grpc.ServerStream) error { - var req gitalypb.RemoveRepositoryRequest - if err := stream.RecvMsg(&req); err != nil { - return fmt.Errorf("receive request: %w", err) + return removeRepositoryHandler(rs, conns, + func(stream grpc.ServerStream) (*gitalypb.Repository, error) { + var req gitalypb.RemoveRepositoryRequest + if err := stream.RecvMsg(&req); err != nil { + return nil, fmt.Errorf("receive request: %w", err) + } + + repo := req.GetRepository() + if repo == nil { + return nil, errMissingRepository + } + + return repo, nil + }, + func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error { + _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{ + Repository: rewritten, + }) + return err + }, + func() proto.Message { return &gitalypb.RemoveRepositoryResponse{} }, + true, + ) +} + +type requestParser func(grpc.ServerStream) (*gitalypb.Repository, error) + +type requestProxier func(context.Context, *grpc.ClientConn, *gitalypb.Repository) error + +type responseFactory func() proto.Message + +func removeRepositoryHandler(rs datastore.RepositoryStore, conns Connections, parseRequest requestParser, proxyRequest requestProxier, buildResponse responseFactory, errorOnNotFound bool) grpc.StreamHandler { + return func(_ interface{}, stream grpc.ServerStream) error { + repo, err := parseRequest(stream) + if err != nil { + return err } ctx := stream.Context() - repo := req.GetRepository() - if repo == nil { - return errMissingRepository - } virtualStorage := repo.StorageName replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath) if err != nil { - // Gitaly doesn't return an error if the repository is not found, so Praefect follows the - // same protocol. if errors.As(err, new(commonerr.RepositoryNotFoundError)) { - return helper.ErrNotFoundf("repository does not exist") + if errorOnNotFound { + if errors.As(err, new(commonerr.RepositoryNotFoundError)) { + return helper.ErrNotFoundf("repository does not exist") + } + } + + return stream.SendMsg(buildResponse()) } return fmt.Errorf("delete repository: %w", err) @@ -46,19 +78,15 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr var wg sync.WaitGroup - storageSet := make(map[string]struct{}, len(storages)) - for _, storage := range storages { - storageSet[storage] = struct{}{} - } - // It's not critical these deletions complete as the background crawler will identify these repos as deleted. // To rather return a successful code to the client, we limit the timeout here to 10s. ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second) defer cancel() - for storage, conn := range conns[virtualStorage] { - if _, ok := storageSet[storage]; !ok { - // There may be database records for replicas which exist on storages that are not configured in the + for _, storage := range storages { + conn, ok := conns[virtualStorage][storage] + if !ok { + // There may be database records for object pools which exist on storages that are not configured in the // local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled // by the background cleaner like any other stale repository if the storages are returned to the configuration. continue @@ -68,11 +96,11 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr go func(rewrittenStorage string, conn *grpc.ClientConn) { defer wg.Done() - req := proto.Clone(&req).(*gitalypb.RemoveRepositoryRequest) - req.Repository.StorageName = rewrittenStorage - req.Repository.RelativePath = replicaPath + rewritten := proto.Clone(repo).(*gitalypb.Repository) + rewritten.StorageName = rewrittenStorage + rewritten.RelativePath = replicaPath - if _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, req); err != nil { + if err := proxyRequest(ctx, conn, rewritten); err != nil { ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ "virtual_storage": virtualStorage, "relative_path": repo.RelativePath, @@ -84,6 +112,6 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr wg.Wait() - return stream.SendMsg(&gitalypb.RemoveRepositoryResponse{}) + return stream.SendMsg(buildResponse()) } } diff --git a/internal/praefect/server.go b/internal/praefect/server.go index c1087da28..d75810246 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -149,6 +149,9 @@ func NewGRPCServer( "RepositoryExists": RepositoryExistsHandler(rs), "RemoveRepository": RemoveRepositoryHandler(rs, conns), }) + proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{ + "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns), + }) } return srv |