Welcome to mirror list, hosted at ThFree Co, Russian Federation.

remove_repository.go « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 2d8c9a17cc5cbe172ff605983eca7009a96991ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package praefect

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
	"github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/commonerr"
	"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
	"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
	"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/proto"
)

// RemoveRepositoryHandler intercepts RemoveRepository calls, deletes the database records and
// deletes the repository from every backing Gitaly node.
func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
	return removeRepositoryHandler(rs, conns,
		func(stream grpc.ServerStream) (*gitalypb.Repository, error) {
			var req gitalypb.RemoveRepositoryRequest
			if err := stream.RecvMsg(&req); err != nil {
				return nil, fmt.Errorf("receive request: %w", err)
			}

			repo := req.GetRepository()
			if repo == nil {
				return nil, errMissingRepository
			}

			return repo, nil
		},
		func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error {
			_, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
				Repository: rewritten,
			})
			return err
		},
		func() proto.Message { return &gitalypb.RemoveRepositoryResponse{} },
		true,
	)
}

type requestParser func(grpc.ServerStream) (*gitalypb.Repository, error)

type requestProxier func(context.Context, *grpc.ClientConn, *gitalypb.Repository) error

type responseFactory func() proto.Message

func removeRepositoryHandler(rs datastore.RepositoryStore, conns Connections, parseRequest requestParser, proxyRequest requestProxier, buildResponse responseFactory, errorOnNotFound bool) grpc.StreamHandler {
	return func(_ interface{}, stream grpc.ServerStream) error {
		repo, err := parseRequest(stream)
		if err != nil {
			return err
		}

		ctx := stream.Context()

		virtualStorage := repo.StorageName
		replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath)
		if err != nil {
			if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
				if errorOnNotFound {
					if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
						return structerr.NewNotFound("repository does not exist")
					}
				}

				return stream.SendMsg(buildResponse())
			}

			return fmt.Errorf("delete repository: %w", err)
		}

		var wg sync.WaitGroup

		// It's not critical these deletions complete as the background crawler will identify these repos as deleted.
		// To rather return a successful code to the client, we limit the timeout here to 10s.
		ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second)
		defer cancel()

		for _, storage := range storages {
			conn, ok := conns[virtualStorage][storage]
			if !ok {
				// There may be database records for object pools which exist on storages that are not configured in the
				// local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled
				// by the background cleaner like any other stale repository if the storages are returned to the configuration.
				continue
			}

			wg.Add(1)
			go func(rewrittenStorage string, conn *grpc.ClientConn) {
				defer wg.Done()

				rewritten := proto.Clone(repo).(*gitalypb.Repository)
				rewritten.StorageName = rewrittenStorage
				rewritten.RelativePath = replicaPath

				if err := proxyRequest(ctx, conn, rewritten); err != nil {
					ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
						"virtual_storage": virtualStorage,
						"relative_path":   repo.RelativePath,
						"storage":         rewrittenStorage,
					}).WithError(err).Error("failed deleting repository")
				}
			}(storage, conn)
		}

		wg.Wait()

		return stream.SendMsg(buildResponse())
	}
}