diff options
author | Paul Okstad <pokstad@gitlab.com> | 2019-04-09 20:26:20 +0300 |
---|---|---|
committer | Jacob Vosmaer <jacob@gitlab.com> | 2019-04-09 20:26:20 +0300 |
commit | ddec2b518c5233b7f35403383b8e88bbfa965f92 (patch) | |
tree | 3c8cc4eecbaf832212c7f0ff5698f8f4f007c092 | |
parent | 522bec3a08710897834a9ce0a9d00dd282c7fd1f (diff) |
Datastore pattern for replication jobs
-rw-r--r-- | changelogs/unreleased/datastore-pattern-for-replication-jobs.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 29 | ||||
-rw-r--r-- | internal/praefect/common.go | 21 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 24 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 60 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 22 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 101 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 220 | ||||
-rw-r--r-- | internal/praefect/datastore_memory_test.go | 96 | ||||
-rw-r--r-- | internal/praefect/datastore_test.go | 117 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 128 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 128 | ||||
-rw-r--r-- | internal/praefect/server.go | 99 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 50 |
14 files changed, 961 insertions, 139 deletions
diff --git a/changelogs/unreleased/datastore-pattern-for-replication-jobs.yml b/changelogs/unreleased/datastore-pattern-for-replication-jobs.yml new file mode 100644 index 000000000..f81236920 --- /dev/null +++ b/changelogs/unreleased/datastore-pattern-for-replication-jobs.yml @@ -0,0 +1,5 @@ +--- +title: Datastore pattern for replication jobs +merge_request: 1147 +author: +type: other diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 0661081e7..941486861 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -75,20 +75,34 @@ func configure() (config.Config, error) { } func run(listeners []net.Listener, conf config.Config) error { - srv := praefect.NewServer(nil, logger) + var ( + // top level server dependencies + coordinator = praefect.NewCoordinator(logger, conf.PrimaryServer.Name) + datastore = praefect.NewMemoryDatastore(conf, time.Now()) + repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) + srv = praefect.NewServer(coordinator, repl, nil, logger) + + // signal related + signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} + termCh = make(chan os.Signal, len(signals)) + serverErrors = make(chan error, 1) + ) - signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT} - termCh := make(chan os.Signal, len(signals)) signal.Notify(termCh, signals...) - serverErrors := make(chan error, 1) - for _, l := range listeners { go func(lis net.Listener) { serverErrors <- srv.Start(lis) }(l) } - for _, gitaly := range conf.GitalyServers { - srv.RegisterNode(gitaly.Name, gitaly.ListenAddr) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { serverErrors <- repl.ProcessBacklog(ctx) }() + + allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer) + + for _, gitaly := range allBackendServers { + coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr) logger.WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node") } @@ -97,6 +111,7 @@ func run(listeners []net.Listener, conf config.Config) error { select { case s := <-termCh: logger.WithField("signal", s).Warn("received signal, shutting down gracefully") + cancel() // cancels the replicator job processing ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) if shutdownErr := srv.Shutdown(ctx); shutdownErr != nil { diff --git a/internal/praefect/common.go b/internal/praefect/common.go new file mode 100644 index 000000000..a63d309bd --- /dev/null +++ b/internal/praefect/common.go @@ -0,0 +1,21 @@ +package praefect + +import "google.golang.org/grpc" + +// Repository provides all necessary information to address a repository hosted +// in a specific Gitaly replica +type Repository struct { + RelativePath string // relative path of repository + Storage string // storage location, e.g. default +} + +// Node is a wrapper around the grpc client connection for a backend Gitaly node +type Node struct { + Storage string + cc *grpc.ClientConn +} + +// logging keys to use with logrus WithField +const ( + logKeyProjectPath = "ProjectPath" +) diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 096c946c0..768104ed1 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -10,12 +10,18 @@ import ( // Config is a container for everything found in the TOML config file type Config struct { - ListenAddr string `toml:"listen_addr" split_words:"true"` - SocketPath string `toml:"socket_path" split_words:"true"` - GitalyServers []*GitalyServer `toml:"gitaly_server", split_words:"true"` + ListenAddr string `toml:"listen_addr"` + SocketPath string `toml:"socket_path"` + + PrimaryServer *GitalyServer `toml:"primary_server"` + SecondaryServers []*GitalyServer `toml:"secondary_server"` + + // Whitelist is a list of relative project paths (paths comprised of project + // hashes) that are permitted to use high availability features + Whitelist []string `toml:"whitelist"` Logging config.Logging `toml:"logging"` - PrometheusListenAddr string `toml:"prometheus_listen_addr", split_words:"true"` + PrometheusListenAddr string `toml:"prometheus_listen_addr"` } // GitalyServer allows configuring the servers that RPCs are proxied to @@ -39,23 +45,25 @@ func FromFile(filePath string) (Config, error) { var ( errNoListener = errors.New("no listen address or socket path configured") - errNoGitalyServers = errors.New("no gitaly backends configured") + errNoGitalyServers = errors.New("no primary gitaly backends configured") errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique") errGitalyWithoutName = errors.New("all gitaly servers must have a name") ) +var emptyServer = &GitalyServer{} + // Validate establishes if the config is valid func (c Config) Validate() error { if c.ListenAddr == "" && c.SocketPath == "" { return errNoListener } - if len(c.GitalyServers) == 0 { + if c.PrimaryServer == nil || c.PrimaryServer == emptyServer { return errNoGitalyServers } - listenAddrs := make(map[string]bool, len(c.GitalyServers)) - for _, gitaly := range c.GitalyServers { + listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1) + for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) { if gitaly.Name == "" { return errGitalyWithoutName } diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 306650771..33731b17d 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -4,10 +4,15 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestConfigValidation(t *testing.T) { - gitalySrvs := []*GitalyServer{&GitalyServer{"test", "localhost:23456"}} + primarySrv := &GitalyServer{"test", "localhost:23456"} + secondarySrvs := []*GitalyServer{ + {"test1", "localhost:23457"}, + {"test2", "localhost:23458"}, + } testCases := []struct { desc string @@ -16,35 +21,74 @@ func TestConfigValidation(t *testing.T) { }{ { desc: "No ListenAddr or SocketPath", - config: Config{ListenAddr: "", GitalyServers: gitalySrvs}, + config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, err: errNoListener, }, { desc: "Only a SocketPath", - config: Config{SocketPath: "/tmp/praefect.socket", GitalyServers: gitalySrvs}, + config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, err: nil, }, { desc: "No servers", - config: Config{ListenAddr: "localhost:1234", GitalyServers: nil}, + config: Config{ListenAddr: "localhost:1234"}, err: errNoGitalyServers, }, { desc: "duplicate address", - config: Config{ListenAddr: "localhost:1234", GitalyServers: []*GitalyServer{gitalySrvs[0], gitalySrvs[0]}}, + config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*GitalyServer{primarySrv}}, err: errDuplicateGitalyAddr, }, { desc: "Valid config", - config: Config{ListenAddr: "localhost:1234", GitalyServers: gitalySrvs}, + config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, err: nil, }, } for _, tc := range testCases { - t.Run(tc.desc, func(t1 *testing.T) { + t.Run(tc.desc, func(t *testing.T) { err := tc.config.Validate() - assert.Equal(t, err, tc.err) + assert.Equal(t, tc.err, err) + }) + } +} + +func TestConfigParsing(t *testing.T) { + testCases := []struct { + filePath string + expected Config + }{ + { + filePath: "testdata/config.toml", + expected: Config{ + PrimaryServer: &GitalyServer{ + Name: "default", + ListenAddr: "tcp://gitaly-primary.example.com", + }, + SecondaryServers: []*GitalyServer{ + { + Name: "default", + ListenAddr: "tcp://gitaly-backup1.example.com", + }, + { + Name: "backup", + ListenAddr: "tcp://gitaly-backup2.example.com", + }, + }, + Whitelist: []string{ + "abcd1234", + "edfg5678", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.filePath, func(t *testing.T) { + cfg, err := FromFile(tc.filePath) + require.NoError(t, err) + require.Equal(t, tc.expected, cfg) }) } } diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml new file mode 100644 index 000000000..24f64add9 --- /dev/null +++ b/internal/praefect/config/testdata/config.toml @@ -0,0 +1,22 @@ +listen_addr = "" +socket_path = "" +whitelist = ["abcd1234", "edfg5678"] +prometheus_listen_addr = "" + +[primary_server] + name = "default" + listen_addr = "tcp://gitaly-primary.example.com" + +[[secondary_server]] + name = "default" + listen_addr = "tcp://gitaly-backup1.example.com" + +[[secondary_server]] + name = "backup" + listen_addr = "tcp://gitaly-backup2.example.com" + +[logging] + Format = "" + sentry_dsn = "" + ruby_sentry_dsn = "" + level = "" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go new file mode 100644 index 000000000..2229019e3 --- /dev/null +++ b/internal/praefect/coordinator.go @@ -0,0 +1,101 @@ +package praefect + +import ( + "context" + "fmt" + "sync" + + "github.com/mwitkow/grpc-proxy/proxy" + "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/client" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Coordinator takes care of directing client requests to the appropriate +// downstream server. The coordinator is thread safe; concurrent calls to +// register nodes are safe. +type Coordinator struct { + log *logrus.Logger + lock sync.RWMutex + + storageLoc string + + nodes map[string]*grpc.ClientConn +} + +// NewCoordinator returns a new Coordinator that utilizes the provided logger +func NewCoordinator(l *logrus.Logger, storageLoc string) *Coordinator { + return &Coordinator{ + log: l, + storageLoc: storageLoc, + nodes: make(map[string]*grpc.ClientConn), + } +} + +// GetStorageNode returns the registered node for the given storage location +func (c *Coordinator) GetStorageNode(storage string) (Node, error) { + cc, ok := c.getConn(storage) + if !ok { + return Node{}, fmt.Errorf("no node registered for storage location %q", storage) + } + + return Node{ + Storage: storage, + cc: cc, + }, nil +} + +// streamDirector determines which downstream servers receive requests +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + // For phase 1, we need to route messages based on the storage location + // to the appropriate Gitaly node. + c.log.Debugf("Stream director received method %s", fullMethodName) + + if c.storageLoc == "" { + err := status.Error( + codes.FailedPrecondition, + "no downstream node registered", + ) + return nil, nil, err + } + + // We only need the primary node, as there's only one primary storage + // location per praefect at this time + cc, ok := c.getConn(c.storageLoc) + if !ok { + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", c.storageLoc) + } + + return ctx, cc, nil +} + +// RegisterNode will direct traffic to the supplied downstream connection when the storage location +// is encountered. +func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error { + conn, err := client.Dial(listenAddr, + []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec()))}, + ) + if err != nil { + return err + } + + c.setConn(storageLoc, conn) + + return nil +} + +func (c *Coordinator) setConn(storageLoc string, conn *grpc.ClientConn) { + c.lock.Lock() + c.nodes[storageLoc] = conn + c.lock.Unlock() +} + +func (c *Coordinator) getConn(storageLoc string) (*grpc.ClientConn, bool) { + c.lock.RLock() + cc, ok := c.nodes[storageLoc] + c.lock.RUnlock() + + return cc, ok +} diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go new file mode 100644 index 000000000..1dfaa2dca --- /dev/null +++ b/internal/praefect/datastore.go @@ -0,0 +1,220 @@ +/*Package praefect provides data models and datastore persistence abstractions +for tracking the state of repository replicas. + +See original design discussion: +https://gitlab.com/gitlab-org/gitaly/issues/1495 + + +*/ +package praefect + +import ( + "errors" + "sync" + "time" + + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" +) + +// ReplJob is an instance of a queued replication job. A replication job is +// meant for updating the repository so that it is synced with the primary +// copy. Scheduled indicates when a replication job should be performed. +type ReplJob struct { + Target string // which storage location to replicate to? + Source Repository // source for replication + Scheduled time.Time +} + +// Datastore is a data persistence abstraction for all of Praefect's +// persistence needs +type Datastore interface { + ReplJobsDatastore + ReplicasDatastore +} + +// ReplicasDatastore manages accessing and setting which secondary replicas +// backup a repository +type ReplicasDatastore interface { + // GetSecondaries will retrieve all secondary replica storage locations for + // a primary replica + GetSecondaries(primary Repository) ([]string, error) + + // SetSecondaries will set the secondary storage locations for a repository + // in a primary replica. + SetSecondaries(primary Repository, secondaries []string) error +} + +// ReplJobsDatastore represents the behavior needed for fetching and updating +// replication jobs from the datastore +type ReplJobsDatastore interface { + // GetReplJobs fetches a list of chronologically ordered replication + // jobs for the given storage replica + GetReplJobs(storage string, since time.Time, count int) ([]ReplJob, error) + + // PutReplJob will update or create a replication job for the specified repo + // on a specific storage node + PutReplJob(repo Repository, when time.Time) error +} + +// shard is a set of primary and secondary storage replicas for a project +type shard struct { + primary string + secondaries []string +} + +// MemoryDatastore is a simple datastore that isn't persisted to disk. It is +// only intended for early beta requirements and as a reference implementation +// for the eventual SQL implementation +type MemoryDatastore struct { + mu sync.RWMutex // locks entire datastore + replicas map[string]shard // projectHash keyed to shards + storageJobs map[string]map[string]time.Time // keyed by storage then project +} + +// NewMemoryDatastore returns an initialized in-memory datastore +func NewMemoryDatastore(cfg config.Config, immediate time.Time) *MemoryDatastore { + m := &MemoryDatastore{ + replicas: map[string]shard{}, + storageJobs: map[string]map[string]time.Time{}, + } + + for _, project := range cfg.Whitelist { + // store the configuration file specified shard + m.replicas[project] = shard{ + primary: cfg.PrimaryServer.Name, + secondaries: func() []string { + servers := make([]string, len(cfg.SecondaryServers)) + for i, server := range cfg.SecondaryServers { + servers[i] = server.Name + } + return servers + }(), + } + + // initialize replication job queue to replicate all whitelisted repos + // to every secondary server + for _, secondary := range cfg.SecondaryServers { + projectJobs, ok := m.storageJobs[secondary.Name] + if !ok { + projectJobs = map[string]time.Time{} + m.storageJobs[secondary.Name] = projectJobs + } + + projectJobs[project] = immediate + } + + } + + return m +} + +// GetSecondaries will return the set of secondary storage locations for a +// given repository if they exist +func (md *MemoryDatastore) GetSecondaries(primary Repository) ([]string, error) { + shard, _ := md.getShard(primary.RelativePath) + + return shard.secondaries, nil +} + +// SetSecondaries will replace the set of replicas for a repository +func (md *MemoryDatastore) SetSecondaries(primary Repository, secondaries []string) error { + md.mu.Lock() + md.replicas[primary.RelativePath] = shard{ + primary: primary.Storage, + secondaries: secondaries, + } + md.mu.Unlock() + + return nil +} + +func (md *MemoryDatastore) getShard(project string) (shard, bool) { + md.mu.RLock() + replicas, ok := md.replicas[project] + md.mu.RUnlock() + + return replicas, ok +} + +// ErrSecondariesMissing indicates the repository does not have any backup +// replicas +var ErrSecondariesMissing = errors.New("repository missing secondary replicas") + +// GetReplJobs will return any replications jobs for the specified storage +// since the specified scheduled time up to the specified result limit. +func (md *MemoryDatastore) GetReplJobs(storage string, since time.Time, count int) ([]ReplJob, error) { + md.mu.RLock() + jobs := md.storageJobs[storage] + md.mu.RUnlock() + + var results []ReplJob + + for project, scheduled := range jobs { + if len(results) >= count { + break + } + + if scheduled.Before(since) { + continue + } + + shard, ok := md.getShard(project) + if !ok { + return nil, ErrSecondariesMissing + } + + results = append(results, ReplJob{ + Source: Repository{ + RelativePath: project, + Storage: shard.primary, + }, + Target: storage, + Scheduled: scheduled, + }) + } + + return results, nil +} + +// ErrInvalidReplTarget indicates a target repository cannot be chosen because +// it fails preconditions for being replicatable +var ErrInvalidReplTarget = errors.New("target repository fails preconditions for replication") + +// PutReplJob will create or update an existing replication job by scheduling +// it at the specified time +func (md *MemoryDatastore) PutReplJob(target Repository, scheduled time.Time) error { + md.mu.RLock() + storageProjectJobs, ok := md.storageJobs[target.Storage] + md.mu.RUnlock() + + if !ok { + storageProjectJobs = map[string]time.Time{} + } + + // target must be a secondary replica. By definition, a secondary replica + // must have a corresponding primary to replicate from + shard, ok := md.getShard(target.RelativePath) + if !ok { + return ErrInvalidReplTarget + } + + found := false + for _, secondary := range shard.secondaries { + if secondary == target.Storage { + found = true + break + } + } + + if !found { + return ErrInvalidReplTarget + } + + storageProjectJobs[target.RelativePath] = scheduled + + md.mu.Lock() + md.storageJobs[target.Storage] = storageProjectJobs + md.mu.Unlock() + + return nil +} diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go new file mode 100644 index 000000000..6fb60cb51 --- /dev/null +++ b/internal/praefect/datastore_memory_test.go @@ -0,0 +1,96 @@ +package praefect_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" +) + +// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will +// populate itself with the correct replication jobs and shards when initialized +// with a configuration file specifying the shard and whitelisted repositories. +func TestMemoryDatastoreWhitelist(t *testing.T) { + var ( + immediate = time.Unix(1300000000, 0) + cfg = config.Config{ + PrimaryServer: &config.GitalyServer{ + Name: "default", + }, + SecondaryServers: []*config.GitalyServer{ + { + Name: "backup-1", + }, + { + Name: "backup-2", + }, + }, + Whitelist: []string{ + "abcd1234", + "5678efgh", + }, + } + mds praefect.Datastore = praefect.NewMemoryDatastore(cfg, immediate) + + repo1 = praefect.Repository{ + RelativePath: cfg.Whitelist[0], + Storage: cfg.PrimaryServer.Name, + } + repo2 = praefect.Repository{ + RelativePath: cfg.Whitelist[1], + Storage: cfg.PrimaryServer.Name, + } + + expectSecondaries = []string{ + cfg.SecondaryServers[0].Name, + cfg.SecondaryServers[1].Name, + } + ) + + for _, repo := range []praefect.Repository{repo1, repo2} { + actualSecondaries, err := mds.GetSecondaries(repo) + require.NoError(t, err) + require.ElementsMatch(t, actualSecondaries, expectSecondaries) + } + + var ( + backup1 = cfg.SecondaryServers[0] + backup2 = cfg.SecondaryServers[1] + + backup1ExpectedJobs = []praefect.ReplJob{ + praefect.ReplJob{ + Target: backup1.Name, + Source: repo1, + Scheduled: immediate, + }, + praefect.ReplJob{ + Target: backup1.Name, + Source: repo2, + Scheduled: immediate, + }, + } + backup2ExpectedJobs = []praefect.ReplJob{ + praefect.ReplJob{ + Target: backup2.Name, + Source: repo1, + Scheduled: immediate, + }, + praefect.ReplJob{ + Target: backup2.Name, + Source: repo2, + Scheduled: immediate, + }, + } + ) + + backup1ActualJobs, err := mds.GetReplJobs(backup1.Name, time.Time{}, 10) + require.NoError(t, err) + require.ElementsMatch(t, backup1ActualJobs, backup1ExpectedJobs) + + backup2ActualJobs, err := mds.GetReplJobs(backup2.Name, time.Time{}, 10) + require.NoError(t, err) + require.ElementsMatch(t, backup2ActualJobs, backup2ExpectedJobs) + +} diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go new file mode 100644 index 000000000..608d0c4c9 --- /dev/null +++ b/internal/praefect/datastore_test.go @@ -0,0 +1,117 @@ +package praefect_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" +) + +const ( + stor1 = "default" // usually the primary storage location + stor2 = "backup-1" // usually the seoncary storage location + proj1 = "abcd1234" // imagine this is a legit project hash +) + +var ( + time0 = time.Time{} + time1 = time.Unix(1, 0) + time2 = time.Unix(2, 0) + + repo1Primary = praefect.Repository{ + RelativePath: proj1, + Storage: stor1, + } + repo1Backup = praefect.Repository{ + RelativePath: proj1, + Storage: stor2, + } +) + +var operations = []struct { + desc string + opFn func(*testing.T, praefect.Datastore) +}{ + { + desc: "query an empty datastore", + opFn: func(t *testing.T, ds praefect.Datastore) { + jobs, err := ds.GetReplJobs(stor1, time1, 1) + require.NoError(t, err) + require.Len(t, jobs, 0) + }, + }, + { + desc: "insert first replication job before secondary mapped to primary", + opFn: func(t *testing.T, ds praefect.Datastore) { + err := ds.PutReplJob(repo1Backup, time2) + require.Error(t, err, praefect.ErrInvalidReplTarget) + }, + }, + { + desc: "associate the replication job target with a primary", + opFn: func(t *testing.T, ds praefect.Datastore) { + err := ds.SetSecondaries(repo1Primary, []string{stor2}) + require.NoError(t, err) + }, + }, + { + desc: "insert first replication job after secondary mapped to primary", + opFn: func(t *testing.T, ds praefect.Datastore) { + err := ds.PutReplJob(repo1Backup, time2) + require.NoError(t, err) + }, + }, + { + desc: "fetch inserted replication job after primary mapped", + opFn: func(t *testing.T, ds praefect.Datastore) { + jobs, err := ds.GetReplJobs(stor2, time1, 10) + + require.NoError(t, err) + require.Len(t, jobs, 1) + + expectedJob := praefect.ReplJob{ + Source: repo1Primary, + Target: stor2, + Scheduled: time2, + } + require.Equal(t, jobs[0], expectedJob) + }, + }, + { + desc: "mark replication job done", + opFn: func(t *testing.T, ds praefect.Datastore) { + err := ds.PutReplJob(repo1Backup, time0) + require.NoError(t, err) + }, + }, + { + desc: "try fetching completed replication job", + opFn: func(t *testing.T, ds praefect.Datastore) { + jobs, err := ds.GetReplJobs(stor1, time1, 1) + require.NoError(t, err) + require.Len(t, jobs, 0) + }, + }, +} + +// TODO: add SQL datastore flavor +var flavors = map[string]func() praefect.Datastore{ + "in-memory-datastore": func() praefect.Datastore { return praefect.NewMemoryDatastore(config.Config{}, time.Now()) }, +} + +// TestDatastoreInterface will verify that every implementation or "flavor" of +// datastore interface (in-Memory or SQL) behaves consistently given the same +// series of operations +func TestDatastoreInterface(t *testing.T) { + for name, dsFactory := range flavors { + t.Run(name, func(t *testing.T) { + ds := dsFactory() + for i, op := range operations { + t.Logf("operation %d: %s", i+1, op.desc) + op.opFn(t, ds) + } + }) + } +} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go new file mode 100644 index 000000000..91871c759 --- /dev/null +++ b/internal/praefect/replicator.go @@ -0,0 +1,128 @@ +package praefect + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" +) + +// Replicator performs the actual replication logic between two nodes +type Replicator interface { + Replicate(ctx context.Context, source Repository, target Node) error +} + +type defaultReplicator struct { + log *logrus.Logger +} + +func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, target Node) error { + dr.log.Infof("replicating from %v to target %q", source, target.Storage) + return nil +} + +// ReplMgr is a replication manager for handling replication jobs +type ReplMgr struct { + log *logrus.Logger + jobsStore ReplJobsDatastore + coordinator *Coordinator + storage string // which replica is this replicator responsible for? + replicator Replicator // does the actual replication logic + + // whitelist contains the project names of the repos we wish to replicate + whitelist map[string]struct{} +} + +// ReplMgrOpt allows a replicator to be configured with additional options +type ReplMgrOpt func(*ReplMgr) + +// NewReplMgr initializes a replication manager with the provided dependencies +// and options +func NewReplMgr(storage string, log *logrus.Logger, ds ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { + r := ReplMgr{ + log: log, + jobsStore: ds, + whitelist: map[string]struct{}{}, + replicator: defaultReplicator{log}, + storage: storage, + coordinator: c, + } + + for _, opt := range opts { + opt(&r) + } + + return r +} + +// WithWhitelist will configure a whitelist for repos to allow replication +func WithWhitelist(whitelistedRepos []string) ReplMgrOpt { + return func(r *ReplMgr) { + for _, repo := range whitelistedRepos { + r.whitelist[repo] = struct{}{} + } + } +} + +// WithReplicator overrides the default replicator +func WithReplicator(r Replicator) ReplMgrOpt { + return func(rm *ReplMgr) { + rm.replicator = r + } +} + +// ScheduleReplication will store a replication job in the datastore for later +// execution. It filters out projects that are not whitelisted. +// TODO: add a parameter to delay replication +func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error { + _, ok := r.whitelist[repo.RelativePath] + if !ok { + r.log.WithField(logKeyProjectPath, repo.RelativePath). + Infof("project %q is not whitelisted for replication", repo.RelativePath) + return nil + } + + return r.jobsStore.PutReplJob(repo, time.Now()) +} + +// ProcessBacklog will process queued jobs. It will block while processing jobs. +func (r ReplMgr) ProcessBacklog(ctx context.Context) error { + since := time.Time{} + for { + r.log.Debugf("fetching replication jobs since %s", since) + jobs, err := r.jobsStore.GetReplJobs(r.storage, since, 10) + if err != nil { + return err + } + + if len(jobs) == 0 { + select { + + // TODO: exponential backoff when no queries are returned + case <-time.After(10 * time.Millisecond): + continue + + case <-ctx.Done(): + return ctx.Err() + + } + } + + r.log.Debugf("fetched replication jobs: %#v", jobs) + + for _, job := range jobs { + r.log.Infof("processing replication job %#v", job) + node, err := r.coordinator.GetStorageNode(job.Target) + if err != nil { + return err + } + + err = r.replicator.Replicate(ctx, job.Source, node) + if err != nil { + return err + } + + since = job.Scheduled + } + } +} diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go new file mode 100644 index 000000000..8e156f0c4 --- /dev/null +++ b/internal/praefect/replicator_test.go @@ -0,0 +1,128 @@ +package praefect_test + +import ( + "context" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" +) + +// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for +// all whitelisted repos +func TestReplicatorProcessJobsWhitelist(t *testing.T) { + var ( + cfg = config.Config{ + PrimaryServer: &config.GitalyServer{ + Name: "default", + ListenAddr: "tcp://gitaly-primary.example.com", + }, + SecondaryServers: []*config.GitalyServer{ + { + Name: "backup1", + ListenAddr: "tcp://gitaly-backup1.example.com", + }, + { + Name: "backup2", + ListenAddr: "tcp://gitaly-backup2.example.com", + }, + }, + Whitelist: []string{ + "abcd1234", + "edfg5678", + }, + } + datastore = praefect.NewMemoryDatastore(cfg, time.Now()) + coordinator = praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name) + resultsCh = make(chan result) + replman = praefect.NewReplMgr( + cfg.SecondaryServers[1].Name, + logrus.New(), + datastore, + coordinator, + praefect.WithWhitelist(cfg.Whitelist), + praefect.WithReplicator(&mockReplicator{resultsCh}), + ) + ) + + for _, node := range []*config.GitalyServer{ + cfg.PrimaryServer, + cfg.SecondaryServers[0], + cfg.SecondaryServers[1], + } { + err := coordinator.RegisterNode(node.Name, node.ListenAddr) + require.NoError(t, err) + } + + ctx, cancel := testhelper.Context() + + errQ := make(chan error) + + go func() { + errQ <- replman.ProcessBacklog(ctx) + }() + + success := make(chan struct{}) + expectJobs := len(cfg.Whitelist) * len(cfg.SecondaryServers) + + go func() { + // we expect one job per whitelisted repo with each backend server + for i := 0; i < expectJobs; i++ { + result := <-resultsCh + + assert.Contains(t, cfg.Whitelist, result.source.RelativePath) + assert.Contains(t, + []string{ + cfg.SecondaryServers[0].Name, + cfg.SecondaryServers[1].Name, + }, + result.target.Storage, + ) + } + + cancel() + success <- struct{}{} + }() + + require.EqualError(t, <-errQ, context.Canceled.Error()) + + select { + + case <-success: + return + + case <-time.After(time.Second): + t.Fatalf("unable to iterate over expected jobs") + + } + +} + +type result struct { + source praefect.Repository + target praefect.Node +} + +type mockReplicator struct { + resultsCh chan<- result +} + +func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Repository, target praefect.Node) error { + select { + + case mr.resultsCh <- result{source, target}: + return nil + + case <-ctx.Done(): + return ctx.Err() + + } + + return nil +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 04779af2c..29414b5a1 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -4,93 +4,30 @@ package praefect import ( "context" - "fmt" "net" - "sync" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/mwitkow/grpc-proxy/proxy" - "gitlab.com/gitlab-org/gitaly/client" + "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/middleware/cancelhandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler" grpccorrelation "gitlab.com/gitlab-org/labkit/correlation/grpc" grpctracing "gitlab.com/gitlab-org/labkit/tracing/grpc" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) -// Logger is a simple interface that allows loggers to be dependency injected -// into the praefect server -type Logger interface { - Debugf(format string, args ...interface{}) -} - -// Coordinator takes care of directing client requests to the appropriate -// downstream server. The coordinator is thread safe; concurrent calls to -// register nodes are safe. -type Coordinator struct { - log Logger - lock sync.RWMutex - - // Nodes will in the first interations have only one key, which limits - // the praefect to serve only 1 distinct set of Gitaly nodes. - // One limitation is that each server needs the same amount of disk - // space in case of full replication. - nodes map[string]*grpc.ClientConn -} - -// newCoordinator returns a new Coordinator that utilizes the provided logger -func newCoordinator(l Logger) *Coordinator { - return &Coordinator{ - log: l, - nodes: make(map[string]*grpc.ClientConn), - } -} - -// streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { - // For phase 1, we need to route messages based on the storage location - // to the appropriate Gitaly node. - c.log.Debugf("Stream director received method %s", fullMethodName) - - c.lock.RLock() - - var cc *grpc.ClientConn - storageLoc := "" - // We only need the first node, as there's only one storage location per - // praefect at this time - for k, v := range c.nodes { - storageLoc = k - cc = v - break - } - c.lock.RUnlock() - - if storageLoc == "" { - err := status.Error( - codes.FailedPrecondition, - "no downstream node registered", - ) - return nil, nil, err - } - - return ctx, cc, nil -} - // Server is a praefect server type Server struct { - *Coordinator - s *grpc.Server + coordinator *Coordinator + repl ReplMgr + s *grpc.Server } // NewServer returns an initialized praefect gPRC proxy server configured // with the provided gRPC server options -func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server { - c := newCoordinator(l) - +func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Logger) *Server { grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...) grpcOpts = append(grpcOpts, []grpc.ServerOption{ grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( @@ -116,31 +53,9 @@ func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server { return &Server{ s: grpc.NewServer(grpcOpts...), - Coordinator: c, - } -} - -// RegisterNode will direct traffic to the supplied downstream connection when the storage location -// is encountered. -func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error { - conn, err := client.Dial(listenAddr, - []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec()))}, - ) - if err != nil { - return err - } - - c.lock.Lock() - defer c.lock.Unlock() - - if _, ok := c.nodes[storageLoc]; !ok && len(c.nodes) > 0 { - conn.Close() - return fmt.Errorf("error: registering %s failed, only one storage location per server is supported", storageLoc) + coordinator: c, + repl: repl, } - - c.nodes[storageLoc] = conn - - return nil } func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption { diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index e0866ebdb..94f5dd289 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -8,10 +8,11 @@ import ( "time" "github.com/mwitkow/grpc-proxy/proxy" - "github.com/stretchr/testify/assert" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "google.golang.org/grpc" ) @@ -44,7 +45,25 @@ func TestServerSimpleUnaryUnary(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - prf := praefect.NewServer(nil, testLogger{t}) + const ( + storagePrimary = "default" + storageBackup = "backup" + ) + + coordinator := praefect.NewCoordinator(logrus.New(), storagePrimary) + datastore := praefect.NewMemoryDatastore(config.Config{}, time.Now()) + replmgr := praefect.NewReplMgr( + storagePrimary, + logrus.New(), + datastore, + coordinator, + ) + prf := praefect.NewServer( + coordinator, + replmgr, + nil, + logrus.New(), + ) listener, port := listenAvailPort(t) t.Logf("proxy listening on port %d", port) @@ -61,10 +80,12 @@ func TestServerSimpleUnaryUnary(t *testing.T) { defer cc.Close() cli := mock.NewSimpleServiceClient(cc) - backend, cleanup := newMockDownstream(t, tt.callback) - defer cleanup() // clean up mock downstream server resources + for _, replica := range []string{storagePrimary, storageBackup} { + backend, cleanup := newMockDownstream(t, tt.callback) + defer cleanup() // clean up mock downstream server resources - prf.RegisterNode("test", backend) + coordinator.RegisterNode(replica, backend) + } ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() @@ -88,17 +109,6 @@ func callbackIncrement(_ context.Context, req *mock.SimpleRequest) (*mock.Simple }, nil } -func TestRegisteringSecondStorageLocation(t *testing.T) { - prf := praefect.NewServer(nil, testLogger{t}) - - mCli, cleanup := newMockDownstream(t, nil) - defer cleanup() // clean up mock downstream server resources - - assert.NoError(t, prf.RegisterNode("1", mCli)) - assert.Error(t, prf.RegisterNode("2", mCli)) - -} - func listenAvailPort(tb testing.TB) (net.Listener, int) { listener, err := net.Listen("tcp", ":0") require.NoError(tb, err) @@ -126,14 +136,6 @@ func dialLocalPort(tb testing.TB, port int, backend bool) *grpc.ClientConn { return cc } -type testLogger struct { - testing.TB -} - -func (tl testLogger) Debugf(format string, args ...interface{}) { - tl.TB.Logf(format, args...) -} - // initializes and returns a client to downstream server, downstream server, and cleanup function func newMockDownstream(tb testing.TB, callback simpleUnaryUnaryCallback) (string, func()) { // setup mock server |