Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSami Hiltunen <shiltunen@gitlab.com>2022-03-07 18:49:04 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2022-04-04 13:54:32 +0300
commitb79eeebc6fb40eae817253bdd03cc1e237e708df (patch)
tree9c1fc645ac4f05c261cd710ba3b344c685d301a3
parent1ad960e9d6c88125c8b64ca8ada057d8cf79589d (diff)
Handle DeleteObjectPool calls in Praefect
Praefect currently proxies DeleteObjectPool calls to Gitalys like any other mutating RPC. This has the same problem as RemoveRepository previously had; a pool can be deleted from the disk and it's metadata record still left in the database. This can then cause problems as Praefect believes a pool replica still exists where one doesn't exist. Praefect doesn't even treat DeleteObjectPool as a repository removing RPC. This means the deletions have never been replicated to the secondaries and the pool metadata records have been left in place. This then causes the reconciler to repeatedly attempt to reconcile from a non-existing primary pool replica to the secondaries. This commit fixes both issues by handling pool deletions in Praefect. Similarly to RemoveRepository, the metadata record of the pool is first deleted and only then the pool is attempted to remove from the Gitalys that have it. This ensures atomicity of the removals when Praefect is rewriting the replica paths. With the behavior fixed, the Praefect specific test asserations can be removed as the behavior now matches what how a plain Gitaly would behave. The handler in Praefect is tested via the same tests that test the handler in Gitaly. Adding separate tests doesn't make sense as external behavior of the the handlers should match and the handler shouldn't exists in Praefect if it is removed from Gitaly. Changelog: fixed
-rw-r--r--internal/gitaly/service/objectpool/create_test.go15
-rw-r--r--internal/praefect/delete_object_pool.go93
-rw-r--r--internal/praefect/delete_object_pool_test.go114
-rw-r--r--internal/praefect/server.go3
4 files changed, 211 insertions, 14 deletions
diff --git a/internal/gitaly/service/objectpool/create_test.go b/internal/gitaly/service/objectpool/create_test.go
index 35afaaf0a..c8b77063c 100644
--- a/internal/gitaly/service/objectpool/create_test.go
+++ b/internal/gitaly/service/objectpool/create_test.go
@@ -1,7 +1,6 @@
package objectpool
import (
- "fmt"
"path/filepath"
"strings"
"testing"
@@ -10,8 +9,6 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
"gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo"
- "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
- "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
@@ -221,18 +218,8 @@ func TestDelete(t *testing.T) {
request.ObjectPool = nil
}
- expectedErr := tc.error
- if tc.error == errInvalidPoolDir && testhelper.IsPraefectEnabled() {
- expectedErr = helper.ErrNotFound(fmt.Errorf(
- "mutator call: route repository mutator: get repository id: %w",
- commonerr.NewRepositoryNotFoundError(repo.GetStorageName(), tc.relativePath),
- ))
- } else if tc.error == errMissingPool && testhelper.IsPraefectEnabled() {
- expectedErr = helper.ErrInvalidArgumentf("repo scoped: unable to descend OID [1 1] into message gitaly.DeleteObjectPoolRequest: proto field is empty")
- }
-
_, err := client.DeleteObjectPool(ctx, request)
- testhelper.RequireGrpcError(t, expectedErr, err)
+ testhelper.RequireGrpcError(t, tc.error, err)
response, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{
Repository: pool.ToProto().GetRepository(),
diff --git a/internal/praefect/delete_object_pool.go b/internal/praefect/delete_object_pool.go
new file mode 100644
index 000000000..26cb7b696
--- /dev/null
+++ b/internal/praefect/delete_object_pool.go
@@ -0,0 +1,93 @@
+package praefect
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool"
+ objectpoolsvc "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/objectpool"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/protobuf/proto"
+)
+
+// DeleteObjectPoolHandler intercepts DeleteObjectPool calls, deletes the database records and
+// deletes the object pool from every backing Gitaly node.
+func DeleteObjectPoolHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
+ return func(_ interface{}, stream grpc.ServerStream) error {
+ var req gitalypb.DeleteObjectPoolRequest
+ if err := stream.RecvMsg(&req); err != nil {
+ return fmt.Errorf("receive request: %w", err)
+ }
+
+ ctx := stream.Context()
+
+ repo, err := objectpoolsvc.ExtractPool(&req)
+ if err != nil {
+ return err
+ }
+
+ if !housekeeping.IsRailsPoolPath(repo.GetRelativePath()) {
+ return helper.ErrInvalidArgument(objectpool.ErrInvalidPoolDir)
+ }
+
+ virtualStorage := repo.StorageName
+ replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath)
+ if err != nil {
+ // Gitaly doesn't return an error if the object pool is not found, so Praefect follows the
+ // same protocol.
+ if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
+ return stream.SendMsg(&gitalypb.DeleteObjectPoolResponse{})
+ }
+
+ return fmt.Errorf("delete object pool: %w", err)
+ }
+
+ var wg sync.WaitGroup
+
+ // It's not critical these deletions complete as the background crawler will identify these repos as deleted.
+ // To rather return a successful code to the client, we limit the timeout here to 10s.
+ ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second)
+ defer cancel()
+
+ for _, storage := range storages {
+ conn, ok := conns[virtualStorage][storage]
+ if !ok {
+ // There may be database records for object pools which exist on storages that are not configured in the
+ // local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled
+ // by the background cleaner like any other stale repository if the storages are returned to the configuration.
+ continue
+ }
+
+ wg.Add(1)
+ go func(rewrittenStorage string, conn *grpc.ClientConn) {
+ defer wg.Done()
+
+ req := proto.Clone(&req).(*gitalypb.DeleteObjectPoolRequest)
+ req.ObjectPool.Repository.StorageName = rewrittenStorage
+ req.ObjectPool.Repository.RelativePath = replicaPath
+
+ if _, err := gitalypb.NewObjectPoolServiceClient(conn).DeleteObjectPool(ctx, req); err != nil {
+ ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
+ "virtual_storage": virtualStorage,
+ "relative_path": repo.RelativePath,
+ "storage": rewrittenStorage,
+ }).WithError(err).Error("failed deleting object pool")
+ }
+ }(storage, conn)
+ }
+
+ wg.Wait()
+
+ return stream.SendMsg(&gitalypb.DeleteObjectPoolResponse{})
+ }
+}
diff --git a/internal/praefect/delete_object_pool_test.go b/internal/praefect/delete_object_pool_test.go
new file mode 100644
index 000000000..9de954c4b
--- /dev/null
+++ b/internal/praefect/delete_object_pool_test.go
@@ -0,0 +1,114 @@
+package praefect
+
+import (
+ "context"
+ "testing"
+
+ grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/log"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+)
+
+type mockObjectPoolService struct {
+ gitalypb.UnimplementedObjectPoolServiceServer
+ deleteObjectPoolFunc func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error)
+}
+
+func (m mockObjectPoolService) DeleteObjectPool(ctx context.Context, req *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) {
+ return m.deleteObjectPoolFunc(ctx, req)
+}
+
+func TestDeleteObjectPoolHandler(t *testing.T) {
+ // the primary returns a successful response
+ primarySrv := grpc.NewServer()
+ gitalypb.RegisterObjectPoolServiceServer(primarySrv, mockObjectPoolService{
+ deleteObjectPoolFunc: func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) {
+ return &gitalypb.DeleteObjectPoolResponse{}, nil
+ },
+ })
+
+ // the secondary fails as it doesn't have the service registered
+ secondarySrv := grpc.NewServer()
+
+ primaryLn, primaryAddr := testhelper.GetLocalhostListener(t)
+ secondaryLn, secondaryAddr := testhelper.GetLocalhostListener(t)
+
+ defer primarySrv.Stop()
+ go primarySrv.Serve(primaryLn)
+
+ defer secondarySrv.Stop()
+ go secondarySrv.Serve(secondaryLn)
+
+ db := testdb.New(t)
+ rs := datastore.NewPostgresRepositoryStore(db, nil)
+
+ ctx := testhelper.Context(t)
+
+ repo := &gitalypb.Repository{
+ StorageName: "virtual-storage",
+ RelativePath: gittest.NewObjectPoolName(t),
+ }
+
+ require.NoError(t,
+ rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, "replica-path", "primary", []string{"secondary", "unconfigured_storage"}, nil, true, true),
+ )
+
+ primaryConn, err := grpc.Dial(primaryAddr, grpc.WithInsecure())
+ require.NoError(t, err)
+ defer primaryConn.Close()
+
+ secondaryConn, err := grpc.Dial(secondaryAddr, grpc.WithInsecure())
+ require.NoError(t, err)
+ defer secondaryConn.Close()
+
+ praefectLn, praefectAddr := testhelper.GetLocalhostListener(t)
+ logger, hook := test.NewNullLogger()
+ praefectSrv := grpc.NewServer(grpc.ChainStreamInterceptor(
+ grpcmwlogrus.StreamServerInterceptor(logrus.NewEntry(logger), grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat)),
+ ))
+ praefectSrv.RegisterService(&grpc.ServiceDesc{
+ ServiceName: "gitaly.ObjectPoolService",
+ HandlerType: (*interface{})(nil),
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "DeleteObjectPool",
+ Handler: DeleteObjectPoolHandler(rs, Connections{
+ "virtual-storage": {
+ "primary": primaryConn,
+ "secondary": secondaryConn,
+ },
+ }),
+ ServerStreams: true,
+ ClientStreams: true,
+ },
+ },
+ }, struct{}{})
+
+ defer praefectSrv.Stop()
+ go praefectSrv.Serve(praefectLn)
+
+ praefectConn, err := grpc.Dial(praefectAddr, grpc.WithInsecure())
+ require.NoError(t, err)
+ defer praefectConn.Close()
+
+ _, err = gitalypb.NewObjectPoolServiceClient(
+ praefectConn,
+ ).DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{
+ ObjectPool: &gitalypb.ObjectPool{Repository: repo},
+ })
+ require.NoError(t, err)
+
+ require.Equal(t, 2, len(hook.Entries), "expected a log entry for failed deletion")
+ require.Equal(t, "failed deleting object pool", hook.Entries[0].Message)
+ require.Equal(t, repo.StorageName, hook.Entries[0].Data["virtual_storage"])
+ require.Equal(t, repo.RelativePath, hook.Entries[0].Data["relative_path"])
+ require.Equal(t, "secondary", hook.Entries[0].Data["storage"])
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index c1087da28..d75810246 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -149,6 +149,9 @@ func NewGRPCServer(
"RepositoryExists": RepositoryExistsHandler(rs),
"RemoveRepository": RemoveRepositoryHandler(rs, conns),
})
+ proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{
+ "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns),
+ })
}
return srv