1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
package praefect
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/commonerr"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
// RemoveRepositoryHandler intercepts RemoveRepository calls, deletes the database records and
// deletes the repository from every backing Gitaly node.
func RemoveRepositoryHandler(rs datastore.RepositoryStore, conns Connections) grpc.StreamHandler {
return removeRepositoryHandler(rs, conns,
func(stream grpc.ServerStream) (*gitalypb.Repository, error) {
var req gitalypb.RemoveRepositoryRequest
if err := stream.RecvMsg(&req); err != nil {
return nil, fmt.Errorf("receive request: %w", err)
}
repo := req.GetRepository()
if repo == nil {
return nil, errMissingRepository
}
return repo, nil
},
func(ctx context.Context, conn *grpc.ClientConn, rewritten *gitalypb.Repository) error {
_, err := gitalypb.NewRepositoryServiceClient(conn).RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
Repository: rewritten,
})
return err
},
func() proto.Message { return &gitalypb.RemoveRepositoryResponse{} },
true,
)
}
type requestParser func(grpc.ServerStream) (*gitalypb.Repository, error)
type requestProxier func(context.Context, *grpc.ClientConn, *gitalypb.Repository) error
type responseFactory func() proto.Message
func removeRepositoryHandler(rs datastore.RepositoryStore, conns Connections, parseRequest requestParser, proxyRequest requestProxier, buildResponse responseFactory, errorOnNotFound bool) grpc.StreamHandler {
return func(_ interface{}, stream grpc.ServerStream) error {
repo, err := parseRequest(stream)
if err != nil {
return err
}
ctx := stream.Context()
virtualStorage := repo.StorageName
replicaPath, storages, err := rs.DeleteRepository(ctx, virtualStorage, repo.RelativePath)
if err != nil {
if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
if errorOnNotFound {
if errors.As(err, new(commonerr.RepositoryNotFoundError)) {
return structerr.NewNotFound("repository does not exist")
}
}
return stream.SendMsg(buildResponse())
}
return fmt.Errorf("delete repository: %w", err)
}
var wg sync.WaitGroup
// It's not critical these deletions complete as the background crawler will identify these repos as deleted.
// To rather return a successful code to the client, we limit the timeout here to 10s.
ctx, cancel := context.WithTimeout(stream.Context(), 10*time.Second)
defer cancel()
for _, storage := range storages {
conn, ok := conns[virtualStorage][storage]
if !ok {
// There may be database records for object pools which exist on storages that are not configured in the
// local Praefect. We'll just ignore them here and not explicitly attempt to delete them. They'll be handled
// by the background cleaner like any other stale repository if the storages are returned to the configuration.
continue
}
wg.Add(1)
go func(rewrittenStorage string, conn *grpc.ClientConn) {
defer wg.Done()
rewritten := proto.Clone(repo).(*gitalypb.Repository)
rewritten.StorageName = rewrittenStorage
rewritten.RelativePath = replicaPath
if err := proxyRequest(ctx, conn, rewritten); err != nil {
ctxlogrus.Extract(ctx).WithFields(logrus.Fields{
"virtual_storage": virtualStorage,
"relative_path": repo.RelativePath,
"storage": rewrittenStorage,
}).WithError(err).Error("failed deleting repository")
}
}(storage, conn)
}
wg.Wait()
return stream.SendMsg(buildResponse())
}
}
|