Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-03-31 10:05:41 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-04 13:54:32 +0300
commit27fe4f2f884f9cb8ba4ad931d115799259533cea (patch)
treec2274a2ab23f8c03916f38412a32b4e565a4d49b
parentb79eeebc6fb40eae817253bdd03cc1e237e708df (diff)
Share common code between DeleteObjectPool and RemoveRepository
Praefect contains two special handlers for handling repository removals. Both of these handlers do largely the same thing. This commit shares the common code between the two handlers.
-rw-r--r--internal/praefect/delete_object_pool.go92
-rw-r--r--internal/praefect/delete_object_pool_test.go2
-rw-r--r--internal/praefect/remove_repository.go76
3 files changed, 77 insertions, 93 deletions
diff --git a/internal/praefect/delete_object_pool.go b/internal/praefect/delete_object_pool.go
index 26cb7b696..b31ba40c3 100644
--- a/internal/praefect/delete_object_pool.go
+++ b/internal/praefect/delete_object_pool.go
@@ -2,18 +2,12 @@ package praefect
import (
"context"
- "errors"
"fmt"
- "sync"
- "time"
- "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
- "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool"
objectpoolsvc "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/objectpool"
"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"google.golang.org/grpc"
@@ -23,71 +17,33 @@ import (
// DeleteObjectPoolHandler intercepts DeleteObjectPool calls, deletes the database records and
// deletes the object pool from every backing Gitaly node.
func DeleteObjectPoolHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
- return func(_ interface{}, stream grpc.ServerStream) error {
- var req gitalypb.DeleteObjectPoolRequest
- if err := stream.RecvMsg(&req); err != nil {
- return fmt.Errorf("receive request: %w", err)
- }
-
- ctx := stream.Context()
-
- repo, err := objectpoolsvc.ExtractPool(&req)
- if err != nil {
- return err
- }
-
- if !housekeeping.IsRailsPoolPath(repo.GetRelativePath()) {
- return helper.ErrInvalidArgument(objectpool.ErrInvalidPoolDir)
- }
-
- virtualStorage := repo.StorageName
- replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath)
- if err != nil {
- // Gitaly doesn't return an error if the object pool is not found, so Praefect follows the
- // same protocol.
- if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
- return stream.SendMsg(&gitalypb.DeleteObjectPoolResponse{})
+ return removeRepositoryHandler(rs, conns,
+ func(stream grpc.ServerStream) (*gitalypb.Repository, error) {
+ var req gitalypb.DeleteObjectPoolRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return nil, fmt.Errorf("receive request: %w", err)
}
- return fmt.Errorf("delete object pool: %w", err)
- }
-
- var wg sync.WaitGroup
-
- // It's not critical these deletions complete as the background crawler will identify these repos as deleted.
- // To rather return a successful code to the client, we limit the timeout here to 10s.
- ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second)
- defer cancel()
-
- for _, storage := range storages {
- conn, ok := conns[virtualStorage][storage]
- if !ok {
- // There may be database records for object pools which exist on storages that are not configured in the
- // local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled
- // by the background cleaner like any other stale repository if the storages are returned to the configuration.
- continue
+ repo, err := objectpoolsvc.ExtractPool(&req)
+ if err != nil {
+ return nil, err
}
- wg.Add(1)
- go func(rewrittenStorage string, conn *grpc.ClientConn) {
- defer wg.Done()
-
- req := proto.Clone(&req).(*gitalypb.DeleteObjectPoolRequest)
- req.ObjectPool.Repository.StorageName = rewrittenStorage
- req.ObjectPool.Repository.RelativePath = replicaPath
-
- if _, err := gitalypb.NewObjectPoolServiceClient(conn).DeleteObjectPool(ctx, req); err != nil {
- ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
- "virtual_storage": virtualStorage,
- "relative_path": repo.RelativePath,
- "storage": rewrittenStorage,
- }).WithError(err).Error("failed deleting object pool")
- }
- }(storage, conn)
- }
-
- wg.Wait()
+ if !housekeeping.IsRailsPoolPath(repo.GetRelativePath()) {
+ return nil, helper.ErrInvalidArgument(objectpool.ErrInvalidPoolDir)
+ }
- return stream.SendMsg(&gitalypb.DeleteObjectPoolResponse{})
- }
+ return repo, nil
+ },
+ func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error {
+ _, err := gitalypb.NewObjectPoolServiceClient(conn).DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{
+ Repository: rewritten,
+ },
+ })
+ return err
+ },
+ func() proto.Message { return &gitalypb.DeleteObjectPoolResponse{} },
+ false,
+ )
}
diff --git a/internal/praefect/delete_object_pool_test.go b/internal/praefect/delete_object_pool_test.go
index 9de954c4b..2a8c46f64 100644
--- a/internal/praefect/delete_object_pool_test.go
+++ b/internal/praefect/delete_object_pool_test.go
@@ -107,7 +107,7 @@ func TestDeleteObjectPoolHandler(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(hook.Entries), "expected a log entry for failed deletion")
- require.Equal(t, "failed deleting object pool", hook.Entries[0].Message)
+ require.Equal(t, "failed deleting repository", hook.Entries[0].Message)
require.Equal(t, repo.StorageName, hook.Entries[0].Data["virtual_storage"])
require.Equal(t, repo.RelativePath, hook.Entries[0].Data["relative_path"])
require.Equal(t, "secondary", hook.Entries[0].Data["storage"])
diff --git a/internal/praefect/remove_repository.go b/internal/praefect/remove_repository.go
index 62d92e5a2..74a033ef8 100644
--- a/internal/praefect/remove_repository.go
+++ b/internal/praefect/remove_repository.go
@@ -20,25 +20,57 @@ import (
// RemoveRepositoryHandler intercepts RemoveRepository calls, deletes the database records and
// deletes the repository from every backing Gitaly node.
func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
- return func(srv interface{}, stream grpc.ServerStream) error {
- var req gitalypb.RemoveRepositoryRequest
- if err := stream.RecvMsg(&req); err != nil {
- return fmt.Errorf("receive request: %w", err)
+ return removeRepositoryHandler(rs, conns,
+ func(stream grpc.ServerStream) (*gitalypb.Repository, error) {
+ var req gitalypb.RemoveRepositoryRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return nil, fmt.Errorf("receive request: %w", err)
+ }
+
+ repo := req.GetRepository()
+ if repo == nil {
+ return nil, errMissingRepository
+ }
+
+ return repo, nil
+ },
+ func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error {
+ _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
+ Repository: rewritten,
+ })
+ return err
+ },
+ func() proto.Message { return &gitalypb.RemoveRepositoryResponse{} },
+ true,
+ )
+}
+
+type requestParser func(grpc.ServerStream) (*gitalypb.Repository, error)
+
+type requestProxier func(context.Context, *grpc.ClientConn, *gitalypb.Repository) error
+
+type responseFactory func() proto.Message
+
+func removeRepositoryHandler(rs datastore.RepositoryStore, conns Connections, parseRequest requestParser, proxyRequest requestProxier, buildResponse responseFactory, errorOnNotFound bool) grpc.StreamHandler {
+ return func(_ interface{}, stream grpc.ServerStream) error {
+ repo, err := parseRequest(stream)
+ if err != nil {
+ return err
}
ctx := stream.Context()
- repo := req.GetRepository()
- if repo == nil {
- return errMissingRepository
- }
virtualStorage := repo.StorageName
replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath)
if err != nil {
- // Gitaly doesn't return an error if the repository is not found, so Praefect follows the
- // same protocol.
if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
- return helper.ErrNotFoundf("repository does not exist")
+ if errorOnNotFound {
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return helper.ErrNotFoundf("repository does not exist")
+ }
+ }
+
+ return stream.SendMsg(buildResponse())
}
return fmt.Errorf("delete repository: %w", err)
@@ -46,19 +78,15 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr
var wg sync.WaitGroup
- storageSet := make(map[string]struct{}, len(storages))
- for _, storage := range storages {
- storageSet[storage] = struct{}{}
- }
-
// It's not critical these deletions complete as the background crawler will identify these repos as deleted.
// To rather return a successful code to the client, we limit the timeout here to 10s.
ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second)
defer cancel()
- for storage, conn := range conns[virtualStorage] {
- if _, ok := storageSet[storage]; !ok {
- // There may be database records for replicas which exist on storages that are not configured in the
+ for _, storage := range storages {
+ conn, ok := conns[virtualStorage][storage]
+ if !ok {
+ // There may be database records for object pools which exist on storages that are not configured in the
// local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled
// by the background cleaner like any other stale repository if the storages are returned to the configuration.
continue
@@ -68,11 +96,11 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr
go func(rewrittenStorage string, conn *grpc.ClientConn) {
defer wg.Done()
- req := proto.Clone(&req).(*gitalypb.RemoveRepositoryRequest)
- req.Repository.StorageName = rewrittenStorage
- req.Repository.RelativePath = replicaPath
+ rewritten := proto.Clone(repo).(*gitalypb.Repository)
+ rewritten.StorageName = rewrittenStorage
+ rewritten.RelativePath = replicaPath
- if _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, req); err != nil {
+ if err := proxyRequest(ctx, conn, rewritten); err != nil {
ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
"virtual_storage": virtualStorage,
"relative_path": repo.RelativePath,
@@ -84,6 +112,6 @@ func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) gr
wg.Wait()
- return stream.SendMsg(&gitalypb.RemoveRepositoryResponse{})
+ return stream.SendMsg(buildResponse())
}
}