Welcome to mirror list, hosted at ThFree Co, Russian Federation.

replicate_repository.go « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 513e1060cc67eaf97b89a75e6cde508c819f488c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package praefect

import (
	"fmt"

	"gitlab.com/gitlab-org/gitaly/v16/internal/featureflag"
	"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
	"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
	"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
	"google.golang.org/grpc"
)

// ReplicateRepositoryHandler intercepts ReplicateRepository RPC calls.
func ReplicateRepositoryHandler(coordinator *Coordinator) grpc.StreamHandler {
	return func(srv interface{}, stream grpc.ServerStream) error {
		ctx := stream.Context()

		if featureflag.InterceptReplicateRepository.IsDisabled(ctx) {
			// Fallback to the default transparent handler to proxy the RPC.
			return proxy.TransparentHandler(coordinator.StreamDirector)(srv, stream)
		}

		// Peek the stream to get first request.
		var req gitalypb.ReplicateRepositoryRequest
		if err := stream.RecvMsg(&req); err != nil {
			return fmt.Errorf("receive request: %w", err)
		}

		// Validate required target repository information is present in first request.
		if err := coordinator.validateTargetRepo(req.GetRepository()); err != nil {
			return structerr.NewInvalidArgument("%w", err)
		}

		const fullMethodName = "/gitaly.RepositoryService/ReplicateRepository"
		mi, err := coordinator.registry.LookupMethod(fullMethodName)
		if err != nil {
			return err
		}

		// Object pool replication is not yet supported by Praefect. Rewrite the request to always
		// disable object pool replication.
		req.ReplicateObjectDeduplicationNetworkMembership = false

		// Generate stream parameters used to configure the stream proxy.
		parameters, err := coordinator.mutatorStreamParameters(ctx, grpcCall{
			fullMethodName: fullMethodName,
			methodInfo:     mi,
			msg:            &req,
			targetRepo:     req.GetRepository(),
		})
		if err != nil {
			return err
		}

		// Proxy stream to destination storages.
		return proxy.HandleStream(stream, fullMethodName, parameters)
	}
}