Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2019-04-09 20:26:20 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-04-09 20:26:20 +0300
commitddec2b518c5233b7f35403383b8e88bbfa965f92 (patch)
tree3c8cc4eecbaf832212c7f0ff5698f8f4f007c092
parent522bec3a08710897834a9ce0a9d00dd282c7fd1f (diff)
Datastore pattern for replication jobs
-rw-r--r--changelogs/unreleased/datastore-pattern-for-replication-jobs.yml5
-rw-r--r--cmd/praefect/main.go29
-rw-r--r--internal/praefect/common.go21
-rw-r--r--internal/praefect/config/config.go24
-rw-r--r--internal/praefect/config/config_test.go60
-rw-r--r--internal/praefect/config/testdata/config.toml22
-rw-r--r--internal/praefect/coordinator.go101
-rw-r--r--internal/praefect/datastore.go220
-rw-r--r--internal/praefect/datastore_memory_test.go96
-rw-r--r--internal/praefect/datastore_test.go117
-rw-r--r--internal/praefect/replicator.go128
-rw-r--r--internal/praefect/replicator_test.go128
-rw-r--r--internal/praefect/server.go99
-rw-r--r--internal/praefect/server_test.go50
14 files changed, 961 insertions, 139 deletions
diff --git a/changelogs/unreleased/datastore-pattern-for-replication-jobs.yml b/changelogs/unreleased/datastore-pattern-for-replication-jobs.yml
new file mode 100644
index 000000000..f81236920
--- /dev/null
+++ b/changelogs/unreleased/datastore-pattern-for-replication-jobs.yml
@@ -0,0 +1,5 @@
+---
+title: Datastore pattern for replication jobs
+merge_request: 1147
+author:
+type: other
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 0661081e7..941486861 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -75,20 +75,34 @@ func configure() (config.Config, error) {
}
func run(listeners []net.Listener, conf config.Config) error {
- srv := praefect.NewServer(nil, logger)
+ var (
+ // top level server dependencies
+ coordinator = praefect.NewCoordinator(logger, conf.PrimaryServer.Name)
+ datastore = praefect.NewMemoryDatastore(conf, time.Now())
+ repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
+ srv = praefect.NewServer(coordinator, repl, nil, logger)
+
+ // signal related
+ signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
+ termCh = make(chan os.Signal, len(signals))
+ serverErrors = make(chan error, 1)
+ )
- signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
- termCh := make(chan os.Signal, len(signals))
signal.Notify(termCh, signals...)
- serverErrors := make(chan error, 1)
-
for _, l := range listeners {
go func(lis net.Listener) { serverErrors <- srv.Start(lis) }(l)
}
- for _, gitaly := range conf.GitalyServers {
- srv.RegisterNode(gitaly.Name, gitaly.ListenAddr)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
+
+ allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer)
+
+ for _, gitaly := range allBackendServers {
+ coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr)
logger.WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node")
}
@@ -97,6 +111,7 @@ func run(listeners []net.Listener, conf config.Config) error {
select {
case s := <-termCh:
logger.WithField("signal", s).Warn("received signal, shutting down gracefully")
+ cancel() // cancels the replicator job processing
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
if shutdownErr := srv.Shutdown(ctx); shutdownErr != nil {
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
new file mode 100644
index 000000000..a63d309bd
--- /dev/null
+++ b/internal/praefect/common.go
@@ -0,0 +1,21 @@
+package praefect
+
+import "google.golang.org/grpc"
+
+// Repository provides all necessary information to address a repository hosted
+// in a specific Gitaly replica
+type Repository struct {
+ RelativePath string // relative path of repository
+ Storage string // storage location, e.g. default
+}
+
+// Node is a wrapper around the grpc client connection for a backend Gitaly node
+type Node struct {
+ Storage string
+ cc *grpc.ClientConn
+}
+
+// logging keys to use with logrus WithField
+const (
+ logKeyProjectPath = "ProjectPath"
+)
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 096c946c0..768104ed1 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -10,12 +10,18 @@ import (
// Config is a container for everything found in the TOML config file
type Config struct {
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- SocketPath string `toml:"socket_path" split_words:"true"`
- GitalyServers []*GitalyServer `toml:"gitaly_server", split_words:"true"`
+ ListenAddr string `toml:"listen_addr"`
+ SocketPath string `toml:"socket_path"`
+
+ PrimaryServer *GitalyServer `toml:"primary_server"`
+ SecondaryServers []*GitalyServer `toml:"secondary_server"`
+
+ // Whitelist is a list of relative project paths (paths comprised of project
+ // hashes) that are permitted to use high availability features
+ Whitelist []string `toml:"whitelist"`
Logging config.Logging `toml:"logging"`
- PrometheusListenAddr string `toml:"prometheus_listen_addr", split_words:"true"`
+ PrometheusListenAddr string `toml:"prometheus_listen_addr"`
}
// GitalyServer allows configuring the servers that RPCs are proxied to
@@ -39,23 +45,25 @@ func FromFile(filePath string) (Config, error) {
var (
errNoListener = errors.New("no listen address or socket path configured")
- errNoGitalyServers = errors.New("no gitaly backends configured")
+ errNoGitalyServers = errors.New("no primary gitaly backends configured")
errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique")
errGitalyWithoutName = errors.New("all gitaly servers must have a name")
)
+var emptyServer = &GitalyServer{}
+
// Validate establishes if the config is valid
func (c Config) Validate() error {
if c.ListenAddr == "" && c.SocketPath == "" {
return errNoListener
}
- if len(c.GitalyServers) == 0 {
+ if c.PrimaryServer == nil || c.PrimaryServer == emptyServer {
return errNoGitalyServers
}
- listenAddrs := make(map[string]bool, len(c.GitalyServers))
- for _, gitaly := range c.GitalyServers {
+ listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1)
+ for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) {
if gitaly.Name == "" {
return errGitalyWithoutName
}
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 306650771..33731b17d 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -4,10 +4,15 @@ import (
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestConfigValidation(t *testing.T) {
- gitalySrvs := []*GitalyServer{&GitalyServer{"test", "localhost:23456"}}
+ primarySrv := &GitalyServer{"test", "localhost:23456"}
+ secondarySrvs := []*GitalyServer{
+ {"test1", "localhost:23457"},
+ {"test2", "localhost:23458"},
+ }
testCases := []struct {
desc string
@@ -16,35 +21,74 @@ func TestConfigValidation(t *testing.T) {
}{
{
desc: "No ListenAddr or SocketPath",
- config: Config{ListenAddr: "", GitalyServers: gitalySrvs},
+ config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
err: errNoListener,
},
{
desc: "Only a SocketPath",
- config: Config{SocketPath: "/tmp/praefect.socket", GitalyServers: gitalySrvs},
+ config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
err: nil,
},
{
desc: "No servers",
- config: Config{ListenAddr: "localhost:1234", GitalyServers: nil},
+ config: Config{ListenAddr: "localhost:1234"},
err: errNoGitalyServers,
},
{
desc: "duplicate address",
- config: Config{ListenAddr: "localhost:1234", GitalyServers: []*GitalyServer{gitalySrvs[0], gitalySrvs[0]}},
+ config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*GitalyServer{primarySrv}},
err: errDuplicateGitalyAddr,
},
{
desc: "Valid config",
- config: Config{ListenAddr: "localhost:1234", GitalyServers: gitalySrvs},
+ config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
err: nil,
},
}
for _, tc := range testCases {
- t.Run(tc.desc, func(t1 *testing.T) {
+ t.Run(tc.desc, func(t *testing.T) {
err := tc.config.Validate()
- assert.Equal(t, err, tc.err)
+ assert.Equal(t, tc.err, err)
+ })
+ }
+}
+
+func TestConfigParsing(t *testing.T) {
+ testCases := []struct {
+ filePath string
+ expected Config
+ }{
+ {
+ filePath: "testdata/config.toml",
+ expected: Config{
+ PrimaryServer: &GitalyServer{
+ Name: "default",
+ ListenAddr: "tcp://gitaly-primary.example.com",
+ },
+ SecondaryServers: []*GitalyServer{
+ {
+ Name: "default",
+ ListenAddr: "tcp://gitaly-backup1.example.com",
+ },
+ {
+ Name: "backup",
+ ListenAddr: "tcp://gitaly-backup2.example.com",
+ },
+ },
+ Whitelist: []string{
+ "abcd1234",
+ "edfg5678",
+ },
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.filePath, func(t *testing.T) {
+ cfg, err := FromFile(tc.filePath)
+ require.NoError(t, err)
+ require.Equal(t, tc.expected, cfg)
})
}
}
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
new file mode 100644
index 000000000..24f64add9
--- /dev/null
+++ b/internal/praefect/config/testdata/config.toml
@@ -0,0 +1,22 @@
+listen_addr = ""
+socket_path = ""
+whitelist = ["abcd1234", "edfg5678"]
+prometheus_listen_addr = ""
+
+[primary_server]
+ name = "default"
+ listen_addr = "tcp://gitaly-primary.example.com"
+
+[[secondary_server]]
+ name = "default"
+ listen_addr = "tcp://gitaly-backup1.example.com"
+
+[[secondary_server]]
+ name = "backup"
+ listen_addr = "tcp://gitaly-backup2.example.com"
+
+[logging]
+ Format = ""
+ sentry_dsn = ""
+ ruby_sentry_dsn = ""
+ level = ""
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
new file mode 100644
index 000000000..2229019e3
--- /dev/null
+++ b/internal/praefect/coordinator.go
@@ -0,0 +1,101 @@
+package praefect
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/mwitkow/grpc-proxy/proxy"
+ "github.com/sirupsen/logrus"
+ "gitlab.com/gitlab-org/gitaly/client"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// Coordinator takes care of directing client requests to the appropriate
+// downstream server. The coordinator is thread safe; concurrent calls to
+// register nodes are safe.
+type Coordinator struct {
+ log *logrus.Logger
+ lock sync.RWMutex
+
+ storageLoc string
+
+ nodes map[string]*grpc.ClientConn
+}
+
+// NewCoordinator returns a new Coordinator that utilizes the provided logger
+func NewCoordinator(l *logrus.Logger, storageLoc string) *Coordinator {
+ return &Coordinator{
+ log: l,
+ storageLoc: storageLoc,
+ nodes: make(map[string]*grpc.ClientConn),
+ }
+}
+
+// GetStorageNode returns the registered node for the given storage location
+func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
+ cc, ok := c.getConn(storage)
+ if !ok {
+ return Node{}, fmt.Errorf("no node registered for storage location %q", storage)
+ }
+
+ return Node{
+ Storage: storage,
+ cc: cc,
+ }, nil
+}
+
+// streamDirector determines which downstream servers receive requests
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
+ // For phase 1, we need to route messages based on the storage location
+ // to the appropriate Gitaly node.
+ c.log.Debugf("Stream director received method %s", fullMethodName)
+
+ if c.storageLoc == "" {
+ err := status.Error(
+ codes.FailedPrecondition,
+ "no downstream node registered",
+ )
+ return nil, nil, err
+ }
+
+ // We only need the primary node, as there's only one primary storage
+ // location per praefect at this time
+ cc, ok := c.getConn(c.storageLoc)
+ if !ok {
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", c.storageLoc)
+ }
+
+ return ctx, cc, nil
+}
+
+// RegisterNode will direct traffic to the supplied downstream connection when the storage location
+// is encountered.
+func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error {
+ conn, err := client.Dial(listenAddr,
+ []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec()))},
+ )
+ if err != nil {
+ return err
+ }
+
+ c.setConn(storageLoc, conn)
+
+ return nil
+}
+
+func (c *Coordinator) setConn(storageLoc string, conn *grpc.ClientConn) {
+ c.lock.Lock()
+ c.nodes[storageLoc] = conn
+ c.lock.Unlock()
+}
+
+func (c *Coordinator) getConn(storageLoc string) (*grpc.ClientConn, bool) {
+ c.lock.RLock()
+ cc, ok := c.nodes[storageLoc]
+ c.lock.RUnlock()
+
+ return cc, ok
+}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
new file mode 100644
index 000000000..1dfaa2dca
--- /dev/null
+++ b/internal/praefect/datastore.go
@@ -0,0 +1,220 @@
+/*Package praefect provides data models and datastore persistence abstractions
+for tracking the state of repository replicas.
+
+See original design discussion:
+https://gitlab.com/gitlab-org/gitaly/issues/1495
+
+
+*/
+package praefect
+
+import (
+ "errors"
+ "sync"
+ "time"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+)
+
+// ReplJob is an instance of a queued replication job. A replication job is
+// meant for updating the repository so that it is synced with the primary
+// copy. Scheduled indicates when a replication job should be performed.
+type ReplJob struct {
+ Target string // which storage location to replicate to?
+ Source Repository // source for replication
+ Scheduled time.Time
+}
+
+// Datastore is a data persistence abstraction for all of Praefect's
+// persistence needs
+type Datastore interface {
+ ReplJobsDatastore
+ ReplicasDatastore
+}
+
+// ReplicasDatastore manages accessing and setting which secondary replicas
+// backup a repository
+type ReplicasDatastore interface {
+ // GetSecondaries will retrieve all secondary replica storage locations for
+ // a primary replica
+ GetSecondaries(primary Repository) ([]string, error)
+
+ // SetSecondaries will set the secondary storage locations for a repository
+ // in a primary replica.
+ SetSecondaries(primary Repository, secondaries []string) error
+}
+
+// ReplJobsDatastore represents the behavior needed for fetching and updating
+// replication jobs from the datastore
+type ReplJobsDatastore interface {
+ // GetReplJobs fetches a list of chronologically ordered replication
+ // jobs for the given storage replica
+ GetReplJobs(storage string, since time.Time, count int) ([]ReplJob, error)
+
+ // PutReplJob will update or create a replication job for the specified repo
+ // on a specific storage node
+ PutReplJob(repo Repository, when time.Time) error
+}
+
+// shard is a set of primary and secondary storage replicas for a project
+type shard struct {
+ primary string
+ secondaries []string
+}
+
+// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
+// only intended for early beta requirements and as a reference implementation
+// for the eventual SQL implementation
+type MemoryDatastore struct {
+ mu sync.RWMutex // locks entire datastore
+ replicas map[string]shard // projectHash keyed to shards
+ storageJobs map[string]map[string]time.Time // keyed by storage then project
+}
+
+// NewMemoryDatastore returns an initialized in-memory datastore
+func NewMemoryDatastore(cfg config.Config, immediate time.Time) *MemoryDatastore {
+ m := &MemoryDatastore{
+ replicas: map[string]shard{},
+ storageJobs: map[string]map[string]time.Time{},
+ }
+
+ for _, project := range cfg.Whitelist {
+ // store the configuration file specified shard
+ m.replicas[project] = shard{
+ primary: cfg.PrimaryServer.Name,
+ secondaries: func() []string {
+ servers := make([]string, len(cfg.SecondaryServers))
+ for i, server := range cfg.SecondaryServers {
+ servers[i] = server.Name
+ }
+ return servers
+ }(),
+ }
+
+ // initialize replication job queue to replicate all whitelisted repos
+ // to every secondary server
+ for _, secondary := range cfg.SecondaryServers {
+ projectJobs, ok := m.storageJobs[secondary.Name]
+ if !ok {
+ projectJobs = map[string]time.Time{}
+ m.storageJobs[secondary.Name] = projectJobs
+ }
+
+ projectJobs[project] = immediate
+ }
+
+ }
+
+ return m
+}
+
+// GetSecondaries will return the set of secondary storage locations for a
+// given repository if they exist
+func (md *MemoryDatastore) GetSecondaries(primary Repository) ([]string, error) {
+ shard, _ := md.getShard(primary.RelativePath)
+
+ return shard.secondaries, nil
+}
+
+// SetSecondaries will replace the set of replicas for a repository
+func (md *MemoryDatastore) SetSecondaries(primary Repository, secondaries []string) error {
+ md.mu.Lock()
+ md.replicas[primary.RelativePath] = shard{
+ primary: primary.Storage,
+ secondaries: secondaries,
+ }
+ md.mu.Unlock()
+
+ return nil
+}
+
+func (md *MemoryDatastore) getShard(project string) (shard, bool) {
+ md.mu.RLock()
+ replicas, ok := md.replicas[project]
+ md.mu.RUnlock()
+
+ return replicas, ok
+}
+
+// ErrSecondariesMissing indicates the repository does not have any backup
+// replicas
+var ErrSecondariesMissing = errors.New("repository missing secondary replicas")
+
+// GetReplJobs will return any replications jobs for the specified storage
+// since the specified scheduled time up to the specified result limit.
+func (md *MemoryDatastore) GetReplJobs(storage string, since time.Time, count int) ([]ReplJob, error) {
+ md.mu.RLock()
+ jobs := md.storageJobs[storage]
+ md.mu.RUnlock()
+
+ var results []ReplJob
+
+ for project, scheduled := range jobs {
+ if len(results) >= count {
+ break
+ }
+
+ if scheduled.Before(since) {
+ continue
+ }
+
+ shard, ok := md.getShard(project)
+ if !ok {
+ return nil, ErrSecondariesMissing
+ }
+
+ results = append(results, ReplJob{
+ Source: Repository{
+ RelativePath: project,
+ Storage: shard.primary,
+ },
+ Target: storage,
+ Scheduled: scheduled,
+ })
+ }
+
+ return results, nil
+}
+
+// ErrInvalidReplTarget indicates a target repository cannot be chosen because
+// it fails preconditions for being replicatable
+var ErrInvalidReplTarget = errors.New("target repository fails preconditions for replication")
+
+// PutReplJob will create or update an existing replication job by scheduling
+// it at the specified time
+func (md *MemoryDatastore) PutReplJob(target Repository, scheduled time.Time) error {
+ md.mu.RLock()
+ storageProjectJobs, ok := md.storageJobs[target.Storage]
+ md.mu.RUnlock()
+
+ if !ok {
+ storageProjectJobs = map[string]time.Time{}
+ }
+
+ // target must be a secondary replica. By definition, a secondary replica
+ // must have a corresponding primary to replicate from
+ shard, ok := md.getShard(target.RelativePath)
+ if !ok {
+ return ErrInvalidReplTarget
+ }
+
+ found := false
+ for _, secondary := range shard.secondaries {
+ if secondary == target.Storage {
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ return ErrInvalidReplTarget
+ }
+
+ storageProjectJobs[target.RelativePath] = scheduled
+
+ md.mu.Lock()
+ md.storageJobs[target.Storage] = storageProjectJobs
+ md.mu.Unlock()
+
+ return nil
+}
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
new file mode 100644
index 000000000..6fb60cb51
--- /dev/null
+++ b/internal/praefect/datastore_memory_test.go
@@ -0,0 +1,96 @@
+package praefect_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+)
+
+// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will
+// populate itself with the correct replication jobs and shards when initialized
+// with a configuration file specifying the shard and whitelisted repositories.
+func TestMemoryDatastoreWhitelist(t *testing.T) {
+ var (
+ immediate = time.Unix(1300000000, 0)
+ cfg = config.Config{
+ PrimaryServer: &config.GitalyServer{
+ Name: "default",
+ },
+ SecondaryServers: []*config.GitalyServer{
+ {
+ Name: "backup-1",
+ },
+ {
+ Name: "backup-2",
+ },
+ },
+ Whitelist: []string{
+ "abcd1234",
+ "5678efgh",
+ },
+ }
+ mds praefect.Datastore = praefect.NewMemoryDatastore(cfg, immediate)
+
+ repo1 = praefect.Repository{
+ RelativePath: cfg.Whitelist[0],
+ Storage: cfg.PrimaryServer.Name,
+ }
+ repo2 = praefect.Repository{
+ RelativePath: cfg.Whitelist[1],
+ Storage: cfg.PrimaryServer.Name,
+ }
+
+ expectSecondaries = []string{
+ cfg.SecondaryServers[0].Name,
+ cfg.SecondaryServers[1].Name,
+ }
+ )
+
+ for _, repo := range []praefect.Repository{repo1, repo2} {
+ actualSecondaries, err := mds.GetSecondaries(repo)
+ require.NoError(t, err)
+ require.ElementsMatch(t, actualSecondaries, expectSecondaries)
+ }
+
+ var (
+ backup1 = cfg.SecondaryServers[0]
+ backup2 = cfg.SecondaryServers[1]
+
+ backup1ExpectedJobs = []praefect.ReplJob{
+ praefect.ReplJob{
+ Target: backup1.Name,
+ Source: repo1,
+ Scheduled: immediate,
+ },
+ praefect.ReplJob{
+ Target: backup1.Name,
+ Source: repo2,
+ Scheduled: immediate,
+ },
+ }
+ backup2ExpectedJobs = []praefect.ReplJob{
+ praefect.ReplJob{
+ Target: backup2.Name,
+ Source: repo1,
+ Scheduled: immediate,
+ },
+ praefect.ReplJob{
+ Target: backup2.Name,
+ Source: repo2,
+ Scheduled: immediate,
+ },
+ }
+ )
+
+ backup1ActualJobs, err := mds.GetReplJobs(backup1.Name, time.Time{}, 10)
+ require.NoError(t, err)
+ require.ElementsMatch(t, backup1ActualJobs, backup1ExpectedJobs)
+
+ backup2ActualJobs, err := mds.GetReplJobs(backup2.Name, time.Time{}, 10)
+ require.NoError(t, err)
+ require.ElementsMatch(t, backup2ActualJobs, backup2ExpectedJobs)
+
+}
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
new file mode 100644
index 000000000..608d0c4c9
--- /dev/null
+++ b/internal/praefect/datastore_test.go
@@ -0,0 +1,117 @@
+package praefect_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+)
+
+const (
+ stor1 = "default" // usually the primary storage location
+ stor2 = "backup-1" // usually the seoncary storage location
+ proj1 = "abcd1234" // imagine this is a legit project hash
+)
+
+var (
+ time0 = time.Time{}
+ time1 = time.Unix(1, 0)
+ time2 = time.Unix(2, 0)
+
+ repo1Primary = praefect.Repository{
+ RelativePath: proj1,
+ Storage: stor1,
+ }
+ repo1Backup = praefect.Repository{
+ RelativePath: proj1,
+ Storage: stor2,
+ }
+)
+
+var operations = []struct {
+ desc string
+ opFn func(*testing.T, praefect.Datastore)
+}{
+ {
+ desc: "query an empty datastore",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ jobs, err := ds.GetReplJobs(stor1, time1, 1)
+ require.NoError(t, err)
+ require.Len(t, jobs, 0)
+ },
+ },
+ {
+ desc: "insert first replication job before secondary mapped to primary",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ err := ds.PutReplJob(repo1Backup, time2)
+ require.Error(t, err, praefect.ErrInvalidReplTarget)
+ },
+ },
+ {
+ desc: "associate the replication job target with a primary",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ err := ds.SetSecondaries(repo1Primary, []string{stor2})
+ require.NoError(t, err)
+ },
+ },
+ {
+ desc: "insert first replication job after secondary mapped to primary",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ err := ds.PutReplJob(repo1Backup, time2)
+ require.NoError(t, err)
+ },
+ },
+ {
+ desc: "fetch inserted replication job after primary mapped",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ jobs, err := ds.GetReplJobs(stor2, time1, 10)
+
+ require.NoError(t, err)
+ require.Len(t, jobs, 1)
+
+ expectedJob := praefect.ReplJob{
+ Source: repo1Primary,
+ Target: stor2,
+ Scheduled: time2,
+ }
+ require.Equal(t, jobs[0], expectedJob)
+ },
+ },
+ {
+ desc: "mark replication job done",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ err := ds.PutReplJob(repo1Backup, time0)
+ require.NoError(t, err)
+ },
+ },
+ {
+ desc: "try fetching completed replication job",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ jobs, err := ds.GetReplJobs(stor1, time1, 1)
+ require.NoError(t, err)
+ require.Len(t, jobs, 0)
+ },
+ },
+}
+
+// TODO: add SQL datastore flavor
+var flavors = map[string]func() praefect.Datastore{
+ "in-memory-datastore": func() praefect.Datastore { return praefect.NewMemoryDatastore(config.Config{}, time.Now()) },
+}
+
+// TestDatastoreInterface will verify that every implementation or "flavor" of
+// datastore interface (in-Memory or SQL) behaves consistently given the same
+// series of operations
+func TestDatastoreInterface(t *testing.T) {
+ for name, dsFactory := range flavors {
+ t.Run(name, func(t *testing.T) {
+ ds := dsFactory()
+ for i, op := range operations {
+ t.Logf("operation %d: %s", i+1, op.desc)
+ op.opFn(t, ds)
+ }
+ })
+ }
+}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
new file mode 100644
index 000000000..91871c759
--- /dev/null
+++ b/internal/praefect/replicator.go
@@ -0,0 +1,128 @@
+package praefect
+
+import (
+ "context"
+ "time"
+
+ "github.com/sirupsen/logrus"
+)
+
+// Replicator performs the actual replication logic between two nodes
+type Replicator interface {
+ Replicate(ctx context.Context, source Repository, target Node) error
+}
+
+type defaultReplicator struct {
+ log *logrus.Logger
+}
+
+func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, target Node) error {
+ dr.log.Infof("replicating from %v to target %q", source, target.Storage)
+ return nil
+}
+
+// ReplMgr is a replication manager for handling replication jobs
+type ReplMgr struct {
+ log *logrus.Logger
+ jobsStore ReplJobsDatastore
+ coordinator *Coordinator
+ storage string // which replica is this replicator responsible for?
+ replicator Replicator // does the actual replication logic
+
+ // whitelist contains the project names of the repos we wish to replicate
+ whitelist map[string]struct{}
+}
+
+// ReplMgrOpt allows a replicator to be configured with additional options
+type ReplMgrOpt func(*ReplMgr)
+
+// NewReplMgr initializes a replication manager with the provided dependencies
+// and options
+func NewReplMgr(storage string, log *logrus.Logger, ds ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+ r := ReplMgr{
+ log: log,
+ jobsStore: ds,
+ whitelist: map[string]struct{}{},
+ replicator: defaultReplicator{log},
+ storage: storage,
+ coordinator: c,
+ }
+
+ for _, opt := range opts {
+ opt(&r)
+ }
+
+ return r
+}
+
+// WithWhitelist will configure a whitelist for repos to allow replication
+func WithWhitelist(whitelistedRepos []string) ReplMgrOpt {
+ return func(r *ReplMgr) {
+ for _, repo := range whitelistedRepos {
+ r.whitelist[repo] = struct{}{}
+ }
+ }
+}
+
+// WithReplicator overrides the default replicator
+func WithReplicator(r Replicator) ReplMgrOpt {
+ return func(rm *ReplMgr) {
+ rm.replicator = r
+ }
+}
+
+// ScheduleReplication will store a replication job in the datastore for later
+// execution. It filters out projects that are not whitelisted.
+// TODO: add a parameter to delay replication
+func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error {
+ _, ok := r.whitelist[repo.RelativePath]
+ if !ok {
+ r.log.WithField(logKeyProjectPath, repo.RelativePath).
+ Infof("project %q is not whitelisted for replication", repo.RelativePath)
+ return nil
+ }
+
+ return r.jobsStore.PutReplJob(repo, time.Now())
+}
+
+// ProcessBacklog will process queued jobs. It will block while processing jobs.
+func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
+ since := time.Time{}
+ for {
+ r.log.Debugf("fetching replication jobs since %s", since)
+ jobs, err := r.jobsStore.GetReplJobs(r.storage, since, 10)
+ if err != nil {
+ return err
+ }
+
+ if len(jobs) == 0 {
+ select {
+
+ // TODO: exponential backoff when no queries are returned
+ case <-time.After(10 * time.Millisecond):
+ continue
+
+ case <-ctx.Done():
+ return ctx.Err()
+
+ }
+ }
+
+ r.log.Debugf("fetched replication jobs: %#v", jobs)
+
+ for _, job := range jobs {
+ r.log.Infof("processing replication job %#v", job)
+ node, err := r.coordinator.GetStorageNode(job.Target)
+ if err != nil {
+ return err
+ }
+
+ err = r.replicator.Replicate(ctx, job.Source, node)
+ if err != nil {
+ return err
+ }
+
+ since = job.Scheduled
+ }
+ }
+}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
new file mode 100644
index 000000000..8e156f0c4
--- /dev/null
+++ b/internal/praefect/replicator_test.go
@@ -0,0 +1,128 @@
+package praefect_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+)
+
+// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for
+// all whitelisted repos
+func TestReplicatorProcessJobsWhitelist(t *testing.T) {
+ var (
+ cfg = config.Config{
+ PrimaryServer: &config.GitalyServer{
+ Name: "default",
+ ListenAddr: "tcp://gitaly-primary.example.com",
+ },
+ SecondaryServers: []*config.GitalyServer{
+ {
+ Name: "backup1",
+ ListenAddr: "tcp://gitaly-backup1.example.com",
+ },
+ {
+ Name: "backup2",
+ ListenAddr: "tcp://gitaly-backup2.example.com",
+ },
+ },
+ Whitelist: []string{
+ "abcd1234",
+ "edfg5678",
+ },
+ }
+ datastore = praefect.NewMemoryDatastore(cfg, time.Now())
+ coordinator = praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name)
+ resultsCh = make(chan result)
+ replman = praefect.NewReplMgr(
+ cfg.SecondaryServers[1].Name,
+ logrus.New(),
+ datastore,
+ coordinator,
+ praefect.WithWhitelist(cfg.Whitelist),
+ praefect.WithReplicator(&mockReplicator{resultsCh}),
+ )
+ )
+
+ for _, node := range []*config.GitalyServer{
+ cfg.PrimaryServer,
+ cfg.SecondaryServers[0],
+ cfg.SecondaryServers[1],
+ } {
+ err := coordinator.RegisterNode(node.Name, node.ListenAddr)
+ require.NoError(t, err)
+ }
+
+ ctx, cancel := testhelper.Context()
+
+ errQ := make(chan error)
+
+ go func() {
+ errQ <- replman.ProcessBacklog(ctx)
+ }()
+
+ success := make(chan struct{})
+ expectJobs := len(cfg.Whitelist) * len(cfg.SecondaryServers)
+
+ go func() {
+ // we expect one job per whitelisted repo with each backend server
+ for i := 0; i < expectJobs; i++ {
+ result := <-resultsCh
+
+ assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
+ assert.Contains(t,
+ []string{
+ cfg.SecondaryServers[0].Name,
+ cfg.SecondaryServers[1].Name,
+ },
+ result.target.Storage,
+ )
+ }
+
+ cancel()
+ success <- struct{}{}
+ }()
+
+ require.EqualError(t, <-errQ, context.Canceled.Error())
+
+ select {
+
+ case <-success:
+ return
+
+ case <-time.After(time.Second):
+ t.Fatalf("unable to iterate over expected jobs")
+
+ }
+
+}
+
+type result struct {
+ source praefect.Repository
+ target praefect.Node
+}
+
+type mockReplicator struct {
+ resultsCh chan<- result
+}
+
+func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Repository, target praefect.Node) error {
+ select {
+
+ case mr.resultsCh <- result{source, target}:
+ return nil
+
+ case <-ctx.Done():
+ return ctx.Err()
+
+ }
+
+ return nil
+}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 04779af2c..29414b5a1 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -4,93 +4,30 @@ package praefect
import (
"context"
- "fmt"
"net"
- "sync"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/mwitkow/grpc-proxy/proxy"
- "gitlab.com/gitlab-org/gitaly/client"
+ "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler"
"gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler"
grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc"
grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
-// Logger is a simple interface that allows loggers to be dependency injected
-// into the praefect server
-type Logger interface {
- Debugf(format string, args ...interface{})
-}
-
-// Coordinator takes care of directing client requests to the appropriate
-// downstream server. The coordinator is thread safe; concurrent calls to
-// register nodes are safe.
-type Coordinator struct {
- log Logger
- lock sync.RWMutex
-
- // Nodes will in the first interations have only one key, which limits
- // the praefect to serve only 1 distinct set of Gitaly nodes.
- // One limitation is that each server needs the same amount of disk
- // space in case of full replication.
- nodes map[string]*grpc.ClientConn
-}
-
-// newCoordinator returns a new Coordinator that utilizes the provided logger
-func newCoordinator(l Logger) *Coordinator {
- return &Coordinator{
- log: l,
- nodes: make(map[string]*grpc.ClientConn),
- }
-}
-
-// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
- // For phase 1, we need to route messages based on the storage location
- // to the appropriate Gitaly node.
- c.log.Debugf("Stream director received method %s", fullMethodName)
-
- c.lock.RLock()
-
- var cc *grpc.ClientConn
- storageLoc := ""
- // We only need the first node, as there's only one storage location per
- // praefect at this time
- for k, v := range c.nodes {
- storageLoc = k
- cc = v
- break
- }
- c.lock.RUnlock()
-
- if storageLoc == "" {
- err := status.Error(
- codes.FailedPrecondition,
- "no downstream node registered",
- )
- return nil, nil, err
- }
-
- return ctx, cc, nil
-}
-
// Server is a praefect server
type Server struct {
- *Coordinator
- s *grpc.Server
+ coordinator *Coordinator
+ repl ReplMgr
+ s *grpc.Server
}
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
-func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server {
- c := newCoordinator(l)
-
+func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Logger) *Server {
grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
@@ -116,31 +53,9 @@ func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server {
return &Server{
s: grpc.NewServer(grpcOpts...),
- Coordinator: c,
- }
-}
-
-// RegisterNode will direct traffic to the supplied downstream connection when the storage location
-// is encountered.
-func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error {
- conn, err := client.Dial(listenAddr,
- []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec()))},
- )
- if err != nil {
- return err
- }
-
- c.lock.Lock()
- defer c.lock.Unlock()
-
- if _, ok := c.nodes[storageLoc]; !ok && len(c.nodes) > 0 {
- conn.Close()
- return fmt.Errorf("error: registering %s failed, only one storage location per server is supported", storageLoc)
+ coordinator: c,
+ repl: repl,
}
-
- c.nodes[storageLoc] = conn
-
- return nil
}
func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index e0866ebdb..94f5dd289 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -8,10 +8,11 @@ import (
"time"
"github.com/mwitkow/grpc-proxy/proxy"
- "github.com/stretchr/testify/assert"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"google.golang.org/grpc"
)
@@ -44,7 +45,25 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
- prf := praefect.NewServer(nil, testLogger{t})
+ const (
+ storagePrimary = "default"
+ storageBackup = "backup"
+ )
+
+ coordinator := praefect.NewCoordinator(logrus.New(), storagePrimary)
+ datastore := praefect.NewMemoryDatastore(config.Config{}, time.Now())
+ replmgr := praefect.NewReplMgr(
+ storagePrimary,
+ logrus.New(),
+ datastore,
+ coordinator,
+ )
+ prf := praefect.NewServer(
+ coordinator,
+ replmgr,
+ nil,
+ logrus.New(),
+ )
listener, port := listenAvailPort(t)
t.Logf("proxy listening on port %d", port)
@@ -61,10 +80,12 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
defer cc.Close()
cli := mock.NewSimpleServiceClient(cc)
- backend, cleanup := newMockDownstream(t, tt.callback)
- defer cleanup() // clean up mock downstream server resources
+ for _, replica := range []string{storagePrimary, storageBackup} {
+ backend, cleanup := newMockDownstream(t, tt.callback)
+ defer cleanup() // clean up mock downstream server resources
- prf.RegisterNode("test", backend)
+ coordinator.RegisterNode(replica, backend)
+ }
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@@ -88,17 +109,6 @@ func callbackIncrement(_ context.Context, req *mock.SimpleRequest) (*mock.Simple
}, nil
}
-func TestRegisteringSecondStorageLocation(t *testing.T) {
- prf := praefect.NewServer(nil, testLogger{t})
-
- mCli, cleanup := newMockDownstream(t, nil)
- defer cleanup() // clean up mock downstream server resources
-
- assert.NoError(t, prf.RegisterNode("1", mCli))
- assert.Error(t, prf.RegisterNode("2", mCli))
-
-}
-
func listenAvailPort(tb testing.TB) (net.Listener, int) {
listener, err := net.Listen("tcp", ":0")
require.NoError(tb, err)
@@ -126,14 +136,6 @@ func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn {
return cc
}
-type testLogger struct {
- testing.TB
-}
-
-func (tl testLogger) Debugf(format string, args ...interface{}) {
- tl.TB.Logf(format, args...)
-}
-
// initializes and returns a client to downstream server, downstream server, and cleanup function
func newMockDownstream(tb testing.TB, callback simpleUnaryUnaryCallback) (string, func()) {
// setup mock server