Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-03-19 09:12:10 +0300
committerPaul Okstad <pokstad@gitlab.com>2019-03-19 09:12:10 +0300
commitd0febac0870e6aabb32c11be7694d5dbfd1563eb (patch)
treeab788f8a84f22680d43b3b1492041d55db67bbd2
parentabccf3c2de9c66edefcf4e32757702142c0b2bc5 (diff)
replication manager draft
-rw-r--r--internal/praefect/common.go15
-rw-r--r--internal/praefect/node.go20
-rw-r--r--internal/praefect/replication.go77
-rw-r--r--internal/praefect/server.go1
4 files changed, 113 insertions, 0 deletions
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
new file mode 100644
index 000000000..17667283e
--- /dev/null
+++ b/internal/praefect/common.go
@@ -0,0 +1,15 @@
+package praefect
+
+// Repository contains all the information necessary to address a specific
+// repository
+type Repository struct {
+ Project string // e.g. gitlab.com/gitaly-org/gitaly
+ Storage string // e.g. Default
+}
+
+// ReplJob indicates which repo replicas require syncing
+type ReplJob struct {
+ ID string // unique ID to track job progress in datastore
+ Primary Repository
+ Replica Node
+}
diff --git a/internal/praefect/node.go b/internal/praefect/node.go
new file mode 100644
index 000000000..7100e2559
--- /dev/null
+++ b/internal/praefect/node.go
@@ -0,0 +1,20 @@
+package praefect
+
+import (
+ "context"
+
+ "google.golang.org/grpc"
+)
+
+// Node is a backend Gitaly node that is responsible for hosting repositories
+// in a specific storage location
+type Node struct {
+ storage string // storage location ID (e.g. default)
+ cc *grpc.ClientConn
+}
+
+// PullReplication will attempt to replicate changes from a primary replica
+func (n Node) PullReplication(ctx context.Context, primary Repository) error {
+ // TODO: replication logic in #1484
+ return nil
+}
diff --git a/internal/praefect/replication.go b/internal/praefect/replication.go
new file mode 100644
index 000000000..e8f2dc496
--- /dev/null
+++ b/internal/praefect/replication.go
@@ -0,0 +1,77 @@
+package praefect
+
+import (
+ "context"
+)
+
+// ReplMan is a replication manager for handling replication jobs
+type ReplMan struct {
+ l Logger
+ // whitelist contains the project names of the repos we wish to replicate
+ whitelist map[string]struct{}
+}
+
+type ReplManOpt func(*ReplMan)
+
+func NewReplMan(opts ...ReplManOpt) *ReplMan {
+ m := &ReplMan{
+ whitelist: map[string]struct{}{},
+ }
+
+ for _, opt := range opts {
+ opt(m)
+ }
+
+ return m
+}
+
+// WithWhitelist will configure a whitelist for repos to allow replication
+func WithWhitelist(whitelistedRepos []string) ReplManOpt {
+ return func(m *ReplMan) {
+ for _, r := range whitelistedRepos {
+ m.whitelist[r] = struct{}{}
+ }
+ }
+}
+
+// ReplCoordinator represents all the coordinator functionality the replication
+// manager relies on
+type ReplCoordinator interface {
+ // ReplicationQueue returns a stream of jobs from
+ ReplicationQueue(context.Context) (<-chan ReplJob, error)
+
+ // CompleteJob reports if a job was completed. A non-nil jobErr indicates
+ // the job was not successful.
+ CompleteJob(ctx context.Context, ID string, jobErr error) error
+}
+
+func (rm *ReplMan) ProcessJobs(ctx context.Context, rc ReplCoordinator) error {
+ jobQ, err := rc.ReplicationQueue(ctx)
+
+ for {
+ var (
+ job ReplJob
+ ok bool
+ )
+
+ select {
+
+ // context cancelled
+ case <-ctx.Done():
+ return ctx.Err()
+
+ case job, ok = <-jobQ:
+ if !ok { // channel closed
+ return nil
+ }
+
+ }
+
+ jobErr := job.Replica.PullReplication(ctx, job.Primary)
+
+ err = rc.CompleteJob(ctx, job.ID, jobErr)
+ if err != nil {
+ rm.l.Errorf("unable to report replication job completion for %+v", job.ID)
+ }
+ }
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 04779af2c..6266c1599 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -26,6 +26,7 @@ import (
// into the praefect server
type Logger interface {
Debugf(format string, args ...interface{})
+ Errorf(format string, args ...interface{})
}
// Coordinator takes care of directing client requests to the appropriate