diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2022-03-07 18:49:04 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2022-04-04 13:54:32 +0300 |
commit | b79eeebc6fb40eae817253bdd03cc1e237e708df (patch) | |
tree | 9c1fc645ac4f05c261cd710ba3b344c685d301a3 | |
parent | 1ad960e9d6c88125c8b64ca8ada057d8cf79589d (diff) |
Handle DeleteObjectPool calls in Praefect
Praefect currently proxies DeleteObjectPool calls to Gitalys like
any other mutating RPC. This has the same problem as RemoveRepository
previously had; a pool can be deleted from the disk and it's metadata
record still left in the database. This can then cause problems as
Praefect believes a pool replica still exists where one doesn't exist.
Praefect doesn't even treat DeleteObjectPool as a repository removing
RPC. This means the deletions have never been replicated to the
secondaries and the pool metadata records have been left in place. This
then causes the reconciler to repeatedly attempt to reconcile from a
non-existing primary pool replica to the secondaries.
This commit fixes both issues by handling pool deletions in Praefect.
Similarly to RemoveRepository, the metadata record of the pool is first
deleted and only then the pool is attempted to remove from the Gitalys
that have it. This ensures atomicity of the removals when Praefect is
rewriting the replica paths.
With the behavior fixed, the Praefect specific test asserations can be
removed as the behavior now matches what how a plain Gitaly would
behave.
The handler in Praefect is tested via the same tests that test the
handler in Gitaly. Adding separate tests doesn't make sense as external
behavior of the the handlers should match and the handler shouldn't
exists in Praefect if it is removed from Gitaly.
Changelog: fixed
-rw-r--r-- | internal/gitaly/service/objectpool/create_test.go | 15 | ||||
-rw-r--r-- | internal/praefect/delete_object_pool.go | 93 | ||||
-rw-r--r-- | internal/praefect/delete_object_pool_test.go | 114 | ||||
-rw-r--r-- | internal/praefect/server.go | 3 |
4 files changed, 211 insertions, 14 deletions
diff --git a/internal/gitaly/service/objectpool/create_test.go b/internal/gitaly/service/objectpool/create_test.go index 35afaaf0a..c8b77063c 100644 --- a/internal/gitaly/service/objectpool/create_test.go +++ b/internal/gitaly/service/objectpool/create_test.go @@ -1,7 +1,6 @@ package objectpool import ( - "fmt" "path/filepath" "strings" "testing" @@ -10,8 +9,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v14/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v14/internal/helper" - "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" @@ -221,18 +218,8 @@ func TestDelete(t *testing.T) { request.ObjectPool = nil } - expectedErr := tc.error - if tc.error == errInvalidPoolDir && testhelper.IsPraefectEnabled() { - expectedErr = helper.ErrNotFound(fmt.Errorf( - "mutator call: route repository mutator: get repository id: %w", - commonerr.NewRepositoryNotFoundError(repo.GetStorageName(), tc.relativePath), - )) - } else if tc.error == errMissingPool && testhelper.IsPraefectEnabled() { - expectedErr = helper.ErrInvalidArgumentf("repo scoped: unable to descend OID [1 1] into message gitaly.DeleteObjectPoolRequest: proto field is empty") - } - _, err := client.DeleteObjectPool(ctx, request) - testhelper.RequireGrpcError(t, expectedErr, err) + testhelper.RequireGrpcError(t, tc.error, err) response, err := repositoryClient.RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ Repository: pool.ToProto().GetRepository(), diff --git a/internal/praefect/delete_object_pool.go b/internal/praefect/delete_object_pool.go new file mode 100644 index 000000000..26cb7b696 --- /dev/null +++ b/internal/praefect/delete_object_pool.go @@ -0,0 +1,93 @@ +package praefect + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/objectpool" + objectpoolsvc "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service/objectpool" + "gitlab.com/gitlab-org/gitaly/v14/internal/helper" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +// DeleteObjectPoolHandler intercepts DeleteObjectPool calls, deletes the database records and +// deletes the object pool from every backing Gitaly node. +func DeleteObjectPoolHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler { + return func(_ interface{}, stream grpc.ServerStream) error { + var req gitalypb.DeleteObjectPoolRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + ctx := stream.Context() + + repo, err := objectpoolsvc.ExtractPool(&req) + if err != nil { + return err + } + + if !housekeeping.IsRailsPoolPath(repo.GetRelativePath()) { + return helper.ErrInvalidArgument(objectpool.ErrInvalidPoolDir) + } + + virtualStorage := repo.StorageName + replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath) + if err != nil { + // Gitaly doesn't return an error if the object pool is not found, so Praefect follows the + // same protocol. + if errors.As(err, new(commonerr.RepositoryNotFoundError)) { + return stream.SendMsg(&gitalypb.DeleteObjectPoolResponse{}) + } + + return fmt.Errorf("delete object pool: %w", err) + } + + var wg sync.WaitGroup + + // It's not critical these deletions complete as the background crawler will identify these repos as deleted. + // To rather return a successful code to the client, we limit the timeout here to 10s. + ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second) + defer cancel() + + for _, storage := range storages { + conn, ok := conns[virtualStorage][storage] + if !ok { + // There may be database records for object pools which exist on storages that are not configured in the + // local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled + // by the background cleaner like any other stale repository if the storages are returned to the configuration. + continue + } + + wg.Add(1) + go func(rewrittenStorage string, conn *grpc.ClientConn) { + defer wg.Done() + + req := proto.Clone(&req).(*gitalypb.DeleteObjectPoolRequest) + req.ObjectPool.Repository.StorageName = rewrittenStorage + req.ObjectPool.Repository.RelativePath = replicaPath + + if _, err := gitalypb.NewObjectPoolServiceClient(conn).DeleteObjectPool(ctx, req); err != nil { + ctxlogrus.Extract(ctx).WithFields(logrus.Fields{ + "virtual_storage": virtualStorage, + "relative_path": repo.RelativePath, + "storage": rewrittenStorage, + }).WithError(err).Error("failed deleting object pool") + } + }(storage, conn) + } + + wg.Wait() + + return stream.SendMsg(&gitalypb.DeleteObjectPoolResponse{}) + } +} diff --git a/internal/praefect/delete_object_pool_test.go b/internal/praefect/delete_object_pool_test.go new file mode 100644 index 000000000..9de954c4b --- /dev/null +++ b/internal/praefect/delete_object_pool_test.go @@ -0,0 +1,114 @@ +package praefect + +import ( + "context" + "testing" + + grpcmwlogrus "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v14/internal/log" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type mockObjectPoolService struct { + gitalypb.UnimplementedObjectPoolServiceServer + deleteObjectPoolFunc func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) +} + +func (m mockObjectPoolService) DeleteObjectPool(ctx context.Context, req *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) { + return m.deleteObjectPoolFunc(ctx, req) +} + +func TestDeleteObjectPoolHandler(t *testing.T) { + // the primary returns a successful response + primarySrv := grpc.NewServer() + gitalypb.RegisterObjectPoolServiceServer(primarySrv, mockObjectPoolService{ + deleteObjectPoolFunc: func(context.Context, *gitalypb.DeleteObjectPoolRequest) (*gitalypb.DeleteObjectPoolResponse, error) { + return &gitalypb.DeleteObjectPoolResponse{}, nil + }, + }) + + // the secondary fails as it doesn't have the service registered + secondarySrv := grpc.NewServer() + + primaryLn, primaryAddr := testhelper.GetLocalhostListener(t) + secondaryLn, secondaryAddr := testhelper.GetLocalhostListener(t) + + defer primarySrv.Stop() + go primarySrv.Serve(primaryLn) + + defer secondarySrv.Stop() + go secondarySrv.Serve(secondaryLn) + + db := testdb.New(t) + rs := datastore.NewPostgresRepositoryStore(db, nil) + + ctx := testhelper.Context(t) + + repo := &gitalypb.Repository{ + StorageName: "virtual-storage", + RelativePath: gittest.NewObjectPoolName(t), + } + + require.NoError(t, + rs.CreateRepository(ctx, 1, repo.StorageName, repo.RelativePath, "replica-path", "primary", []string{"secondary", "unconfigured_storage"}, nil, true, true), + ) + + primaryConn, err := grpc.Dial(primaryAddr, grpc.WithInsecure()) + require.NoError(t, err) + defer primaryConn.Close() + + secondaryConn, err := grpc.Dial(secondaryAddr, grpc.WithInsecure()) + require.NoError(t, err) + defer secondaryConn.Close() + + praefectLn, praefectAddr := testhelper.GetLocalhostListener(t) + logger, hook := test.NewNullLogger() + praefectSrv := grpc.NewServer(grpc.ChainStreamInterceptor( + grpcmwlogrus.StreamServerInterceptor(logrus.NewEntry(logger), grpcmwlogrus.WithTimestampFormat(log.LogTimestampFormat)), + )) + praefectSrv.RegisterService(&grpc.ServiceDesc{ + ServiceName: "gitaly.ObjectPoolService", + HandlerType: (*interface{})(nil), + Streams: []grpc.StreamDesc{ + { + StreamName: "DeleteObjectPool", + Handler: DeleteObjectPoolHandler(rs, Connections{ + "virtual-storage": { + "primary": primaryConn, + "secondary": secondaryConn, + }, + }), + ServerStreams: true, + ClientStreams: true, + }, + }, + }, struct{}{}) + + defer praefectSrv.Stop() + go praefectSrv.Serve(praefectLn) + + praefectConn, err := grpc.Dial(praefectAddr, grpc.WithInsecure()) + require.NoError(t, err) + defer praefectConn.Close() + + _, err = gitalypb.NewObjectPoolServiceClient( + praefectConn, + ).DeleteObjectPool(ctx, &gitalypb.DeleteObjectPoolRequest{ + ObjectPool: &gitalypb.ObjectPool{Repository: repo}, + }) + require.NoError(t, err) + + require.Equal(t, 2, len(hook.Entries), "expected a log entry for failed deletion") + require.Equal(t, "failed deleting object pool", hook.Entries[0].Message) + require.Equal(t, repo.StorageName, hook.Entries[0].Data["virtual_storage"]) + require.Equal(t, repo.RelativePath, hook.Entries[0].Data["relative_path"]) + require.Equal(t, "secondary", hook.Entries[0].Data["storage"]) +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index c1087da28..d75810246 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -149,6 +149,9 @@ func NewGRPCServer( "RepositoryExists": RepositoryExistsHandler(rs), "RemoveRepository": RemoveRepositoryHandler(rs, conns), }) + proxy.RegisterStreamHandlers(srv, "gitaly.ObjectPoolService", map[string]grpc.StreamHandler{ + "DeleteObjectPool": DeleteObjectPoolHandler(rs, conns), + }) } return srv |