Welcome to mirror list, hosted at ThFree Co, Russian Federation.

replication.go « praefect « internal - gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: b0d289040a7f80ab02facbc7f12a4a7250a13557 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package praefect

import (
	"context"
)

// ReplMan is a replication manager for handling replication jobs
type ReplMan struct {
	l Logger
	// whitelist contains the project names of the repos we wish to replicate
	whitelist map[string]struct{}
}

type ReplManOpt func(*ReplMan)

func NewReplMan(opts ...ReplManOpt) *ReplMan {
	m := &ReplMan{
		whitelist: map[string]struct{}{},
	}

	for _, opt := range opts {
		opt(m)
	}

	return m
}

// WithWhitelist will configure a whitelist for repos to allow replication
func WithWhitelist(whitelistedRepos []string) ReplManOpt {
	return func(m *ReplMan) {
		for _, r := range whitelistedRepos {
			m.whitelist[r] = struct{}{}
		}
	}
}

// ReplCoordinator represents all the coordinator functionality the replication
// manager relies on
type ReplCoordinator interface {
	// ReplicationQueue returns a stream of jobs from
	ReplicationQueue(context.Context) (<-chan ReplJob, error)

	// CompleteJob reports if a job was completed. A non-nil jobErr indicates
	// the job was not successful.
	CompleteJob(ctx context.Context, ID string, jobErr error) error
}

func (rm *ReplMan) ProcessJobs(ctx context.Context, rc ReplCoordinator) error {
	jobQ, err := rc.ReplicationQueue(ctx)

	for {
		var (
			job ReplJob
			ok  bool
		)

		select {

		// context cancelled
		case <-ctx.Done():
			return ctx.Err()

		case job, ok = <-jobQ:
			if !ok { // channel closed
				return nil
			}

		}

		jobErr := job.Replica.PullReplication(ctx, job.Primary)

		err = rc.CompleteJob(ctx, job.ID, jobErr)
		if err != nil {
			rm.l.Errorf("unable to report replication job completion for %v", job.ID)
		}
	}
}