diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-03-19 09:12:10 +0300 |
---|---|---|
committer | Paul Okstad <pokstad@gitlab.com> | 2019-03-19 09:12:10 +0300 |
commit | d0febac0870e6aabb32c11be7694d5dbfd1563eb (patch) | |
tree | ab788f8a84f22680d43b3b1492041d55db67bbd2 | |
parent | abccf3c2de9c66edefcf4e32757702142c0b2bc5 (diff) |
replication manager draft
-rw-r--r-- | internal/praefect/common.go | 15 | ||||
-rw-r--r-- | internal/praefect/node.go | 20 | ||||
-rw-r--r-- | internal/praefect/replication.go | 77 | ||||
-rw-r--r-- | internal/praefect/server.go | 1 |
4 files changed, 113 insertions, 0 deletions
diff --git a/internal/praefect/common.go b/internal/praefect/common.go new file mode 100644 index 000000000..17667283e --- /dev/null +++ b/internal/praefect/common.go @@ -0,0 +1,15 @@ +package praefect + +// Repository contains all the information necessary to address a specific +// repository +type Repository struct { + Project string // e.g. gitlab.com/gitaly-org/gitaly + Storage string // e.g. Default +} + +// ReplJob indicates which repo replicas require syncing +type ReplJob struct { + ID string // unique ID to track job progress in datastore + Primary Repository + Replica Node +} diff --git a/internal/praefect/node.go b/internal/praefect/node.go new file mode 100644 index 000000000..7100e2559 --- /dev/null +++ b/internal/praefect/node.go @@ -0,0 +1,20 @@ +package praefect + +import ( + "context" + + "google.golang.org/grpc" +) + +// Node is a backend Gitaly node that is responsible for hosting repositories +// in a specific storage location +type Node struct { + storage string // storage location ID (e.g. default) + cc *grpc.ClientConn +} + +// PullReplication will attempt to replicate changes from a primary replica +func (n Node) PullReplication(ctx context.Context, primary Repository) error { + // TODO: replication logic in #1484 + return nil +} diff --git a/internal/praefect/replication.go b/internal/praefect/replication.go new file mode 100644 index 000000000..e8f2dc496 --- /dev/null +++ b/internal/praefect/replication.go @@ -0,0 +1,77 @@ +package praefect + +import ( + "context" +) + +// ReplMan is a replication manager for handling replication jobs +type ReplMan struct { + l Logger + // whitelist contains the project names of the repos we wish to replicate + whitelist map[string]struct{} +} + +type ReplManOpt func(*ReplMan) + +func NewReplMan(opts ...ReplManOpt) *ReplMan { + m := &ReplMan{ + whitelist: map[string]struct{}{}, + } + + for _, opt := range opts { + opt(m) + } + + return m +} + +// WithWhitelist will configure a whitelist for repos to allow replication +func WithWhitelist(whitelistedRepos []string) ReplManOpt { + return func(m *ReplMan) { + for _, r := range whitelistedRepos { + m.whitelist[r] = struct{}{} + } + } +} + +// ReplCoordinator represents all the coordinator functionality the replication +// manager relies on +type ReplCoordinator interface { + // ReplicationQueue returns a stream of jobs from + ReplicationQueue(context.Context) (<-chan ReplJob, error) + + // CompleteJob reports if a job was completed. A non-nil jobErr indicates + // the job was not successful. + CompleteJob(ctx context.Context, ID string, jobErr error) error +} + +func (rm *ReplMan) ProcessJobs(ctx context.Context, rc ReplCoordinator) error { + jobQ, err := rc.ReplicationQueue(ctx) + + for { + var ( + job ReplJob + ok bool + ) + + select { + + // context cancelled + case <-ctx.Done(): + return ctx.Err() + + case job, ok = <-jobQ: + if !ok { // channel closed + return nil + } + + } + + jobErr := job.Replica.PullReplication(ctx, job.Primary) + + err = rc.CompleteJob(ctx, job.ID, jobErr) + if err != nil { + rm.l.Errorf("unable to report replication job completion for %+v", job.ID) + } + } +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 04779af2c..6266c1599 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -26,6 +26,7 @@ import ( // into the praefect server type Logger interface { Debugf(format string, args ...interface{}) + Errorf(format string, args ...interface{}) } // Coordinator takes care of directing client requests to the appropriate |