1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package praefect
import (
"fmt"
"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"google.golang.org/grpc"
)
// ReplicateRepositoryHandler intercepts ReplicateRepository RPC calls.
func ReplicateRepositoryHandler(coordinator *Coordinator) grpc.StreamHandler {
return func(srv interface{}, stream grpc.ServerStream) error {
ctx := stream.Context()
if featureflag.InterceptReplicateRepository.IsDisabled(ctx) {
// Fallback to the default transparent handler to proxy the RPC.
return proxy.TransparentHandler(coordinator.StreamDirector)(srv, stream)
}
// Peek the stream to get first request.
var req gitalypb.ReplicateRepositoryRequest
if err := stream.RecvMsg(&req); err != nil {
return fmt.Errorf("receive request: %w", err)
}
// Validate required target repository information is present in first request.
if err := coordinator.validateTargetRepo(req.GetRepository()); err != nil {
return structerr.NewInvalidArgument("%w", err)
}
const fullMethodName = "/gitaly.RepositoryService/ReplicateRepository"
mi, err := coordinator.registry.LookupMethod(fullMethodName)
if err != nil {
return err
}
// Object pool replication is not yet supported by Praefect. Rewrite the request to always
// disable object pool replication.
req.ReplicateObjectDeduplicationNetworkMembership = false
// Generate stream parameters used to configure the stream proxy.
parameters, err := coordinator.mutatorStreamParameters(ctx, grpcCall{
fullMethodName: fullMethodName,
methodInfo: mi,
msg: &req,
targetRepo: req.GetRepository(),
})
if err != nil {
return err
}
// Proxy stream to destination storages.
return proxy.HandleStream(stream, fullMethodName, parameters)
}
}
|