1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
package praefect
import (
"context"
)
// ReplMan is a replication manager for handling replication jobs
type ReplMan struct {
l Logger
// whitelist contains the project names of the repos we wish to replicate
whitelist map[string]struct{}
}
type ReplManOpt func(*ReplMan)
func NewReplMan(opts ...ReplManOpt) *ReplMan {
m := &ReplMan{
whitelist: map[string]struct{}{},
}
for _, opt := range opts {
opt(m)
}
return m
}
// WithWhitelist will configure a whitelist for repos to allow replication
func WithWhitelist(whitelistedRepos []string) ReplManOpt {
return func(m *ReplMan) {
for _, r := range whitelistedRepos {
m.whitelist[r] = struct{}{}
}
}
}
// ReplCoordinator represents all the coordinator functionality the replication
// manager relies on
type ReplCoordinator interface {
// ReplicationQueue returns a stream of jobs from
ReplicationQueue(context.Context) (<-chan ReplJob, error)
// CompleteJob reports if a job was completed. A non-nil jobErr indicates
// the job was not successful.
CompleteJob(ctx context.Context, ID string, jobErr error) error
}
func (rm *ReplMan) ProcessJobs(ctx context.Context, rc ReplCoordinator) error {
jobQ, err := rc.ReplicationQueue(ctx)
for {
var (
job ReplJob
ok bool
)
select {
// context cancelled
case <-ctx.Done():
return ctx.Err()
case job, ok = <-jobQ:
if !ok { // channel closed
return nil
}
}
jobErr := job.Replica.PullReplication(ctx, job.Primary)
err = rc.CompleteJob(ctx, job.ID, jobErr)
if err != nil {
rm.l.Errorf("unable to report replication job completion for %v", job.ID)
}
}
}
|