Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacob Vosmaer <jacob@gitlab.com>2019-08-13 18:17:07 +0300
committerJacob Vosmaer <jacob@gitlab.com>2019-08-13 18:17:07 +0300
commitd09c98f9ff0a1b7768b0f4c6bf88576e5bbbd80f (patch)
tree87c8e62173e4af68ddf82bb70c1fb9da7bc19fdb
parent8cc7033e971a0fe001603f695dc02feae7ac34ec (diff)
parent763e8191b58adf97a3cfa0a7af59547ef70ca8c4 (diff)
Merge branch 'jc-praefect-data-model' into 'master'
Updating data model for praefect See merge request gitlab-org/gitaly!1399
-rw-r--r--changelogs/unreleased/jc-data-model-changes.yml5
-rw-r--r--cmd/praefect/main.go14
-rw-r--r--internal/helper/storage.go10
-rw-r--r--internal/praefect/common.go8
-rw-r--r--internal/praefect/config/config.go41
-rw-r--r--internal/praefect/config/config_test.go38
-rw-r--r--internal/praefect/config/testdata/config.toml24
-rw-r--r--internal/praefect/coordinator.go166
-rw-r--r--internal/praefect/coordinator_test.go31
-rw-r--r--internal/praefect/datastore.go377
-rw-r--r--internal/praefect/datastore_memory_test.go115
-rw-r--r--internal/praefect/datastore_test.go88
-rw-r--r--internal/praefect/mock/mock.pb.go22
-rw-r--r--internal/praefect/mock/mock.proto5
-rw-r--r--internal/praefect/mocksvc_test.go2
-rw-r--r--internal/praefect/models/node.go17
-rw-r--r--internal/praefect/models/nodes.go8
-rw-r--r--internal/praefect/models/repository.go8
-rw-r--r--internal/praefect/protoregistry/protoregistry.go20
-rw-r--r--internal/praefect/replicator.go103
-rw-r--r--internal/praefect/replicator_test.go111
-rw-r--r--internal/praefect/server_test.go50
22 files changed, 664 insertions, 599 deletions
diff --git a/changelogs/unreleased/jc-data-model-changes.yml b/changelogs/unreleased/jc-data-model-changes.yml
new file mode 100644
index 000000000..89c98dd31
--- /dev/null
+++ b/changelogs/unreleased/jc-data-model-changes.yml
@@ -0,0 +1,5 @@
+---
+title: Praefect data model changes with replication
+merge_request: 1399
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index acf53d2fa..8f40578ab 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -19,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/labkit/tracing"
)
@@ -95,10 +96,9 @@ func run(listeners []net.Listener, conf config.Config) error {
var (
// top level server dependencies
datastore = praefect.NewMemoryDatastore(conf)
- coordinator = praefect.NewCoordinator(logger, datastore)
+ coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
srv = praefect.NewServer(coordinator, repl, nil, logger)
-
// signal related
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
termCh = make(chan os.Signal, len(signals))
@@ -114,14 +114,12 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer)
-
- for _, gitaly := range allBackendServers {
- if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil {
- return fmt.Errorf("failed to register %s: %s", gitaly.Name, err)
+ for _, node := range conf.Nodes {
+ if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil {
+ return fmt.Errorf("failed to register %s: %s", node.Address, err)
}
- logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node")
+ logger.WithField("node_address", node.Address).Info("registered gitaly node")
}
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
diff --git a/internal/helper/storage.go b/internal/helper/storage.go
index 4e535a5d6..4a8559c06 100644
--- a/internal/helper/storage.go
+++ b/internal/helper/storage.go
@@ -34,6 +34,16 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly
return
}
+// IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata
+func IncomingToOutgoing(ctx context.Context) context.Context {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return ctx
+ }
+
+ return metadata.NewOutgoingContext(ctx, md)
+}
+
// InjectGitalyServers injects gitaly-servers metadata into an outgoing context
func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) {
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
index 2df2a4823..a09a292ad 100644
--- a/internal/praefect/common.go
+++ b/internal/praefect/common.go
@@ -1,13 +1,5 @@
package praefect
-import "google.golang.org/grpc"
-
-// Node is a wrapper around the grpc client connection for a backend Gitaly node
-type Node struct {
- Storage string
- cc *grpc.ClientConn
-}
-
// logging keys to use with logrus WithField
const (
logKeyProjectPath = "ProjectPath"
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 6a2a5b5d5..121d2cdbe 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -15,8 +15,7 @@ type Config struct {
ListenAddr string `toml:"listen_addr"`
SocketPath string `toml:"socket_path"`
- PrimaryServer *models.GitalyServer `toml:"primary_server"`
- SecondaryServers []*models.GitalyServer `toml:"secondary_server"`
+ Nodes []*models.Node `toml:"node"`
// Whitelist is a list of relative project paths (paths comprised of project
// hashes) that are permitted to use high availability features
@@ -26,13 +25,6 @@ type Config struct {
PrometheusListenAddr string `toml:"prometheus_listen_addr"`
}
-// GitalyServer allows configuring the servers that RPCs are proxied to
-type GitalyServer struct {
- Name string `toml:"name"`
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- Token string `toml:"token"`
-}
-
// FromFile loads the config for the passed file path
func FromFile(filePath string) (Config, error) {
config := &Config{}
@@ -47,35 +39,38 @@ func FromFile(filePath string) (Config, error) {
}
var (
- errNoListener = errors.New("no listen address or socket path configured")
- errNoGitalyServers = errors.New("no primary gitaly backends configured")
- errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique")
- errGitalyWithoutName = errors.New("all gitaly servers must have a name")
+ errNoListener = errors.New("no listen address or socket path configured")
+ errNoGitalyServers = errors.New("no primary gitaly backends configured")
+ errDuplicateStorage = errors.New("internal gitaly storages are not unique")
+ errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address")
+ errGitalyWithoutStorage = errors.New("all gitaly nodes must have a storage")
)
-var emptyServer = &models.GitalyServer{}
-
// Validate establishes if the config is valid
func (c Config) Validate() error {
if c.ListenAddr == "" && c.SocketPath == "" {
return errNoListener
}
- if c.PrimaryServer == nil || c.PrimaryServer == emptyServer {
+ if len(c.Nodes) == 0 {
return errNoGitalyServers
}
- listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1)
- for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) {
- if gitaly.Name == "" {
- return errGitalyWithoutName
+ storages := make(map[string]struct{}, len(c.Nodes))
+ for _, node := range c.Nodes {
+ if node.Storage == "" {
+ return errGitalyWithoutStorage
+ }
+
+ if node.Address == "" {
+ return errGitalyWithoutAddr
}
- if _, found := listenAddrs[gitaly.ListenAddr]; found {
- return errDuplicateGitalyAddr
+ if _, found := storages[node.Storage]; found {
+ return errDuplicateStorage
}
- listenAddrs[gitaly.ListenAddr] = true
+ storages[node.Storage] = struct{}{}
}
return nil
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index eace5eb2f..19df6ce11 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -9,10 +9,10 @@ import (
)
func TestConfigValidation(t *testing.T) {
- primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"}
- secondarySrvs := []*models.GitalyServer{
- {"test1", "localhost:23457", "secret-token"},
- {"test2", "localhost:23458", "secret-token"},
+ nodes := []*models.Node{
+ {ID: 1, Storage: "internal-1", Address: "localhost:23456", Token: "secret-token"},
+ {ID: 2, Storage: "internal-2", Address: "localhost:23457", Token: "secret-token"},
+ {ID: 3, Storage: "internal-3", Address: "localhost:23458", Token: "secret-token"},
}
testCases := []struct {
@@ -22,12 +22,12 @@ func TestConfigValidation(t *testing.T) {
}{
{
desc: "No ListenAddr or SocketPath",
- config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "", Nodes: nodes},
err: errNoListener,
},
{
desc: "Only a SocketPath",
- config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{SocketPath: "/tmp/praefect.socket", Nodes: nodes},
err: nil,
},
{
@@ -36,13 +36,13 @@ func TestConfigValidation(t *testing.T) {
err: errNoGitalyServers,
},
{
- desc: "duplicate address",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}},
- err: errDuplicateGitalyAddr,
+ desc: "duplicate storage",
+ config: Config{ListenAddr: "localhost:1234", Nodes: append(nodes, &models.Node{Storage: nodes[0].Storage, Address: nodes[1].Address})},
+ err: errDuplicateStorage,
},
{
desc: "Valid config",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "localhost:1234", Nodes: nodes},
err: nil,
},
}
@@ -63,18 +63,18 @@ func TestConfigParsing(t *testing.T) {
{
filePath: "testdata/config.toml",
expected: Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*models.GitalyServer{
+ Nodes: []*models.Node{
+ {
+ Address: "tcp://gitaly-internal-1.example.com",
+ Storage: "praefect-internal-1",
+ },
{
- Name: "default",
- ListenAddr: "tcp://gitaly-backup1.example.com",
+ Address: "tcp://gitaly-internal-2.example.com",
+ Storage: "praefect-internal-2",
},
{
- Name: "backup",
- ListenAddr: "tcp://gitaly-backup2.example.com",
+ Address: "tcp://gitaly-internal-3.example.com",
+ Storage: "praefect-internal-3",
},
},
Whitelist: []string{"abcd1234", "edfg5678"},
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 81701a359..03d0f66f3 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -3,20 +3,20 @@ socket_path = ""
whitelist = ["abcd1234", "edfg5678"]
prometheus_listen_addr = ""
-[primary_server]
- name = "default"
- listen_addr = "tcp://gitaly-primary.example.com"
-
-[[secondary_server]]
- name = "default"
- listen_addr = "tcp://gitaly-backup1.example.com"
-
-[[secondary_server]]
- name = "backup"
- listen_addr = "tcp://gitaly-backup2.example.com"
-
[logging]
format = ""
sentry_dsn = ""
ruby_sentry_dsn = ""
level = ""
+
+[[node]]
+ address = "tcp://gitaly-internal-1.example.com"
+ storage = "praefect-internal-1"
+
+[[node]]
+ address = "tcp://gitaly-internal-2.example.com"
+ storage = "praefect-internal-2"
+
+[[node]]
+ address = "tcp://gitaly-internal-3.example.com"
+ storage = "praefect-internal-3"
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index c238604f3..c6c1e762a 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -2,11 +2,14 @@ package praefect
import (
"context"
+ "errors"
"fmt"
+ "math/rand"
"os"
"os/signal"
"sync"
"syscall"
+ "time"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
@@ -19,8 +22,6 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
// Coordinator takes care of directing client requests to the appropriate
@@ -31,14 +32,14 @@ type Coordinator struct {
failoverMutex sync.RWMutex
connMutex sync.RWMutex
- datastore Datastore
+ datastore ReplicasDatastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -55,19 +56,6 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
-// GetStorageNode returns the registered node for the given storage location
-func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
- cc, ok := c.getConn(storage)
- if !ok {
- return Node{}, fmt.Errorf("no node registered for storage location %q", storage)
- }
-
- return Node{
- Storage: storage,
- cc: cc,
- }, nil
-}
-
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
// For phase 1, we need to route messages based on the storage location
@@ -77,28 +65,86 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
c.failoverMutex.RLock()
defer c.failoverMutex.RUnlock()
- serverConfig, err := c.datastore.GetDefaultPrimary()
+ frame, err := peeker.Peek()
if err != nil {
- err := status.Error(
- codes.FailedPrecondition,
- "no downstream node registered",
- )
return nil, nil, err
}
- // We only need the primary node, as there's only one primary storage
- // location per praefect at this time
- cc, ok := c.getConn(serverConfig.Name)
- if !ok {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name)
+ mi, err := c.registry.LookupMethod(fullMethodName)
+ if err != nil {
+ return nil, nil, err
}
- ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token)
+ var primary *models.Node
+
+ if mi.Scope == protoregistry.ScopeRepository {
+ m, err := mi.UnmarshalRequestProto(frame)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ targetRepo, err := mi.TargetRepo(m)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
+
+ if err != nil {
+ if err != ErrPrimaryNotSet {
+ return nil, nil, err
+ }
+ // if there are no primaries for this repository, pick one
+ nodes, err := c.datastore.GetStorageNodes()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if len(nodes) == 0 {
+ return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
+
+ }
+ newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))]
+
+ // set the primary
+ if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
+ return nil, nil, err
+ }
+
+ primary = &newPrimary
+ }
+
+ targetRepo.StorageName = primary.Storage
+
+ b, err := proxy.Codec().Marshal(m)
+ if err != nil {
+ return nil, nil, err
+ }
+ if err = peeker.Modify(b); err != nil {
+ return nil, nil, err
+ }
+
+ } else {
+ //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
+ // proxy requests that are not repository scoped
+ node, err := c.datastore.GetStorageNodes()
+ if err != nil {
+ return nil, nil, err
+ }
+ if len(node) == 0 {
+ return nil, nil, errors.New("no node storages found")
+ }
+ primary = &node[0]
+ }
+
+ // We only need the primary node, as there's only one primary storage
+ // location per praefect at this time
+ cc, err := c.GetConnection(primary.Storage)
if err != nil {
- return nil, nil, err
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage)
}
- return ctx, cc, nil
+ return helper.IncomingToOutgoing(ctx), cc, nil
}
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
@@ -125,12 +171,17 @@ func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
c.connMutex.Unlock()
}
-func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) {
+// GetConnection gets the grpc client connection based on an address
+func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) {
c.connMutex.RLock()
cc, ok := c.nodes[storageName]
c.connMutex.RUnlock()
+ if !ok {
+ return nil, errors.New("client connection not found")
+ }
+
+ return cc, nil
- return cc, ok
}
// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
@@ -146,55 +197,8 @@ func (c *Coordinator) handleSignalAndRotate() {
<-failoverChan
c.failoverMutex.Lock()
- primary, err := c.datastore.GetDefaultPrimary()
- if err != nil {
- c.log.Fatalf("error when getting default primary: %v", err)
- }
-
- if err := c.rotateSecondaryToPrimary(primary); err != nil {
- c.log.WithError(err).Error("rotating secondary")
- }
+ // TODO: update failover logic
+ c.log.Info("failover happens")
c.failoverMutex.Unlock()
}
}
-
-func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error {
- repositories, err := c.datastore.GetRepositoriesForPrimary(primary)
- if err != nil {
- return err
- }
-
- for _, repoPath := range repositories {
- secondaries, err := c.datastore.GetShardSecondaries(models.Repository{
- RelativePath: repoPath,
- })
- if err != nil {
- return fmt.Errorf("getting secondaries: %v", err)
- }
-
- newPrimary := secondaries[0]
- secondaries = append(secondaries[1:], primary)
-
- if err = c.datastore.SetShardPrimary(models.Repository{
- RelativePath: repoPath,
- }, newPrimary); err != nil {
- return fmt.Errorf("setting primary: %v", err)
- }
-
- if err = c.datastore.SetShardSecondaries(models.Repository{
- RelativePath: repoPath,
- }, secondaries); err != nil {
- return fmt.Errorf("setting secondaries: %v", err)
- }
- }
-
- // set the new default primary
- primary, err = c.datastore.GetShardPrimary(models.Repository{
- RelativePath: repositories[0],
- })
- if err != nil {
- return fmt.Errorf("getting shard primary: %v", err)
- }
-
- return c.datastore.SetDefaultPrimary(primary)
-}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 50045f8a0..0275c6048 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,9 +5,6 @@ import (
"testing"
"github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
var testLogger = logrus.New()
@@ -17,31 +14,5 @@ func init() {
}
func TestSecondaryRotation(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{Name: "primary"},
- SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}},
- Whitelist: []string{"/repoA", "/repoB"},
- }
- d := NewMemoryDatastore(cfg)
- c := NewCoordinator(testLogger, d)
-
- primary, err := d.GetDefaultPrimary()
- require.NoError(t, err)
-
- require.NoError(t, c.rotateSecondaryToPrimary(primary))
-
- primary, err = d.GetDefaultPrimary()
- require.NoError(t, err)
- require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary")
-
- repositories, err := d.GetRepositoriesForPrimary(primary)
- require.NoError(t, err)
-
- for _, repository := range repositories {
- shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository})
- require.NoError(t, err)
-
- require.Len(t, shardSecondaries, 2)
- require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0])
- }
+ t.Skip("secondary rotation will change with the new data model")
}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 5678c6a24..f9787089f 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -34,7 +34,7 @@ const (
// JobStateComplete indicates the job is now complete
JobStateComplete
// JobStateCancelled indicates the job was cancelled. This can occur if the
- // job is no longer relevant (e.g. a node is moved out of a shard)
+ // job is no longer relevant (e.g. a node is moved out of a repository)
JobStateCancelled
)
@@ -42,10 +42,10 @@ const (
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
- ID uint64 // autoincrement ID
- Target string // which storage location to replicate to?
- Source models.Repository // source for replication
- State JobState
+ ID uint64 // autoincrement ID
+ TargetNode, SourceNode models.Node // which node to replicate to?
+ Repository models.Repository // source for replication
+ State JobState
}
// replJobs provides sort manipulation behavior
@@ -64,32 +64,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I
type Datastore interface {
ReplJobsDatastore
ReplicasDatastore
- TemporaryDatastore
-}
-
-// TemporaryDatastore contains methods that will go away once we move to a SQL datastore
-type TemporaryDatastore interface {
- GetDefaultPrimary() (models.GitalyServer, error)
- SetDefaultPrimary(primary models.GitalyServer) error
}
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- // GetSecondaries will retrieve all secondary replica storage locations for
- // a primary replica
- GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error)
+ GetReplicas(relativePath string) ([]models.Node, error)
+
+ GetStorageNode(nodeID int) (models.Node, error)
+
+ GetStorageNodes() ([]models.Node, error)
+
+ GetPrimary(relativePath string) (*models.Node, error)
- GetShardPrimary(repo models.Repository) (models.GitalyServer, error)
+ SetPrimary(relativePath string, storageNodeID int) error
- // SetSecondaries will set the secondary storage locations for a repository
- // in a primary replica.
- SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error
+ AddReplica(relativePath string, storageNodeID int) error
- SetShardPrimary(repo models.Repository, primary models.GitalyServer) error
+ RemoveReplica(relativePath string, storageNodeID int) error
- // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary
- GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error)
+ GetRepository(relativePath string) (*models.Repository, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -98,58 +92,52 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, node string, count int) ([]ReplJob, error)
+ GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error)
- // CreateSecondaryJobs will create replication jobs for each secondary
+ // CreateReplicaJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateSecondaryReplJobs(source models.Repository) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
}
-// shard is a set of primary and secondary storage replicas for a project
-type shard struct {
- primary models.GitalyServer
- secondaries []models.GitalyServer
-}
-
type jobRecord struct {
- relativePath string // project's relative path
- targetNode string
- state JobState
+ relativePath string // project's relative path
+ targetNodeID, sourceNodeID int
+ state JobState
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
type MemoryDatastore struct {
- replicas *struct {
- sync.RWMutex
- m map[string]shard // keyed by project's relative path
- }
-
jobs *struct {
sync.RWMutex
next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
- primary *struct {
+ storageNodes *struct {
sync.RWMutex
- server models.GitalyServer
+ m map[int]models.Node
+ }
+
+ repositories *struct {
+ sync.RWMutex
+ m map[string]models.Repository
}
}
// NewMemoryDatastore returns an initialized in-memory datastore
func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
m := &MemoryDatastore{
- replicas: &struct {
+ storageNodes: &struct {
sync.RWMutex
- m map[string]shard
+ m map[int]models.Node
}{
- m: map[string]shard{},
+ m: map[int]models.Node{},
},
jobs: &struct {
sync.RWMutex
@@ -159,114 +147,190 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
- primary: &struct {
+ repositories: &struct {
sync.RWMutex
- server models.GitalyServer
+ m map[string]models.Repository
}{
- server: models.GitalyServer{
- Name: cfg.PrimaryServer.Name,
- ListenAddr: cfg.PrimaryServer.ListenAddr,
- Token: cfg.PrimaryServer.Token,
- },
+ m: map[string]models.Repository{},
},
}
- secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers))
- for i, server := range cfg.SecondaryServers {
- secondaryServers[i] = *server
+ for i, storageNode := range cfg.Nodes {
+ storageNode.ID = i
+ m.storageNodes.m[i] = *storageNode
}
- for _, repo := range cfg.Whitelist {
- // store the configuration file specified shard
- m.replicas.m[repo] = shard{
- primary: *cfg.PrimaryServer,
- secondaries: secondaryServers,
+ for _, repoPath := range cfg.Whitelist {
+ repo := models.Repository{
+ RelativePath: repoPath,
}
-
- // initialize replication job queue to replicate all whitelisted repos
- // to every secondary server
- for _, secondary := range cfg.SecondaryServers {
- m.jobs.next++
- m.jobs.records[m.jobs.next] = jobRecord{
- state: JobStateReady,
- targetNode: secondary.Name,
- relativePath: repo,
+ for storageID, storageNode := range cfg.Nodes {
+
+ // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node
+ // to be the primary
+ if repo.Primary == (models.Node{}) {
+ repo.Primary = *storageNode
+ } else {
+ repo.Replicas = append(repo.Replicas, *storageNode)
+ // initialize replication job queue to replicate all whitelisted repos
+ // to every replica
+ m.jobs.next++
+ m.jobs.records[m.jobs.next] = jobRecord{
+ state: JobStateReady,
+ targetNodeID: storageID,
+ sourceNodeID: repo.Primary.ID,
+ relativePath: repoPath,
+ }
}
}
+ m.repositories.m[repoPath] = repo
}
return m
}
-// GetShardSecondaries will return the set of secondary storage locations for a
-// given repository if they exist
-func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) {
- shard, _ := md.getShard(primary.RelativePath)
+// GetReplicas gets the secondaries for a repository based on the relative path
+func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.Node, error) {
+ md.repositories.RLock()
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
+ defer md.repositories.RUnlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, errors.New("repository not found")
+ }
- return shard.secondaries, nil
+ return repository.Replicas, nil
}
-// SetShardSecondaries will replace the set of replicas for a repository
-func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// GetStorageNode gets all storage nodes
+func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.Node, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
- shard := md.replicas.m[repo.RelativePath]
- shard.secondaries = secondaries
- md.replicas.m[repo.RelativePath] = shard
+ node, ok := md.storageNodes.m[nodeID]
+ if !ok {
+ return models.Node{}, errors.New("node not found")
+ }
- return nil
+ return node, nil
}
-// SetShardPrimary sets the primary for a repository
-func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// GetStorageNodes gets all storage nodes
+func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
- shard := md.replicas.m[repo.RelativePath]
- shard.primary = primary
- md.replicas.m[repo.RelativePath] = shard
+ var storageNodes []models.Node
+ for _, storageNode := range md.storageNodes.m {
+ storageNodes = append(storageNodes, storageNode)
+ }
+
+ return storageNodes, nil
+}
+
+// GetPrimary gets the primary storage node for a repository of a repository relative path
+func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.Node, error) {
+ md.repositories.RLock()
+ defer md.repositories.RUnlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, ErrPrimaryNotSet
+ }
+
+ storageNode, ok := md.storageNodes.m[repository.Primary.ID]
+ if !ok {
+ return nil, errors.New("node storage not found")
+ }
+ return &storageNode, nil
+
+}
+
+// SetPrimary sets the primary storagee node for a repository of a repository relative path
+func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ repository = models.Repository{RelativePath: relativePath}
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
+ repository.Primary = storageNode
+
+ md.repositories.m[relativePath] = repository
return nil
}
-// GetShardPrimary gets the primary for a repository
-func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// AddReplica adds a secondary to a repository of a repository relative path
+func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return errors.New("repository not found")
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
+
+ repository.Replicas = append(repository.Replicas, storageNode)
- shard := md.replicas.m[repo.RelativePath]
- return shard.primary, nil
+ md.repositories.m[relativePath] = repository
+ return nil
}
-// GetRepositoriesForPrimary gets all repositories
-func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// RemoveReplica removes a secondary from a repository of a repository relative path
+func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- repositories := make([]string, 0, len(md.replicas.m))
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return errors.New("repository not found")
+ }
- for repository := range md.replicas.m {
- repositories = append(repositories, repository)
+ var secondaries []models.Node
+ for _, secondary := range repository.Replicas {
+ if secondary.ID != storageNodeID {
+ secondaries = append(secondaries, secondary)
+ }
}
- return repositories, nil
+ repository.Replicas = secondaries
+ md.repositories.m[relativePath] = repository
+ return nil
}
-func (md *MemoryDatastore) getShard(project string) (shard, bool) {
- md.replicas.RLock()
- replicas, ok := md.replicas.m[project]
- md.replicas.RUnlock()
+// GetRepository gets the repository for a repository relative path
+func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- return replicas, ok
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, errors.New("repository not found")
+ }
+
+ return &repository, nil
}
-// ErrSecondariesMissing indicates the repository does not have any backup
+// ErrReplicasMissing indicates the repository does not have any backup
// replicas
-var ErrSecondariesMissing = errors.New("repository missing secondary replicas")
+var ErrReplicasMissing = errors.New("repository missing secondary replicas")
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) {
+func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) {
md.jobs.RLock()
defer md.jobs.RUnlock()
@@ -274,7 +338,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.targetNode == storage {
+ if record.state&state != 0 && record.targetNodeID == targetNodeID {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -293,60 +357,64 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
}
// replJobFromRecord constructs a replication job from a record and by cross
-// referencing the current shard for the project being replicated
+// referencing the current repository for the project being replicated
func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- shard, ok := md.getShard(record.relativePath)
- if !ok {
- return ReplJob{}, fmt.Errorf(
- "unable to find shard for project at relative path %q",
- record.relativePath,
- )
+ repository, err := md.GetRepository(record.relativePath)
+ if err != nil {
+ return ReplJob{}, err
+ }
+
+ sourceNode, err := md.GetStorageNode(record.sourceNodeID)
+ if err != nil {
+ return ReplJob{}, err
+ }
+ targetNode, err := md.GetStorageNode(record.targetNodeID)
+ if err != nil {
+ return ReplJob{}, err
}
return ReplJob{
- ID: jobID,
- Source: models.Repository{
- RelativePath: record.relativePath,
- Storage: shard.primary.Name,
- },
- State: record.state,
- Target: record.targetNode,
+ ID: jobID,
+ Repository: *repository,
+ SourceNode: sourceNode,
+ State: record.state,
+ TargetNode: targetNode,
}, nil
}
-// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because
+// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because
// it fails preconditions for being replicatable
-var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication")
+var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication")
-// CreateSecondaryReplJobs creates a replication job for each secondary that
+// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
- emptyRepo := models.Repository{}
- if source == emptyRepo {
+ if relativePath == "" {
return nil, errors.New("invalid source repository")
}
- shard, ok := md.getShard(source.RelativePath)
- if !ok {
+ repository, err := md.GetRepository(relativePath)
+ if err != nil {
return nil, fmt.Errorf(
- "unable to find shard for project at relative path %q",
- source.RelativePath,
+ "unable to find repository for project at relative path %q",
+ relativePath,
)
}
var jobIDs []uint64
- for _, secondary := range shard.secondaries {
+ for _, secondary := range repository.Replicas {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- targetNode: secondary.Name,
+ targetNodeID: secondary.ID,
state: JobStatePending,
- relativePath: source.RelativePath,
+ relativePath: relativePath,
+ sourceNodeID: repository.Primary.ID,
}
jobIDs = append(jobIDs, nextID)
@@ -375,36 +443,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
-
-// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error {
- md.primary.Lock()
- defer md.primary.Unlock()
-
- md.primary.server = primary
-
- return nil
-}
-
-// GetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- primary := md.primary.server
- if primary == (models.GitalyServer{}) {
- return primary, ErrPrimaryNotSet
- }
-
- return primary, nil
-}
-
-// SetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- md.primary.server = primary
-
- return nil
-}
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
index 6099a8328..0881b0008 100644
--- a/internal/praefect/datastore_memory_test.go
+++ b/internal/praefect/datastore_memory_test.go
@@ -1,93 +1,96 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will
-// populate itself with the correct replication jobs and shards when initialized
+// populate itself with the correct replication jobs and repositories when initialized
// with a configuration file specifying the shard and whitelisted repositories.
func TestMemoryDatastoreWhitelist(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup-1",
- },
- {
- Name: "backup-2",
- },
- },
- Whitelist: []string{"abcd1234", "5678efgh"},
- }
-
- mds := praefect.NewMemoryDatastore(cfg)
-
+ t.Skip("Since we are getting rid of the whitelist, we can skip this test for now. We can remove it once we get rid of the whitelist")
repo1 := models.Repository{
- RelativePath: cfg.Whitelist[0],
- Storage: cfg.PrimaryServer.Name,
+ RelativePath: "abcd1234",
}
-
repo2 := models.Repository{
- RelativePath: cfg.Whitelist[1],
- Storage: cfg.PrimaryServer.Name,
+ RelativePath: "5678efgh",
}
+ mds := NewMemoryDatastore(config.Config{
+ Nodes: []*models.Node{
+ &models.Node{
+ ID: 0,
+ Address: "tcp://default",
+ Storage: "praefect-internal-1",
+ },
+ &models.Node{
+ ID: 1,
+ Address: "tcp://backup-2",
+ Storage: "praefect-internal-2",
+ }, &models.Node{
+ ID: 2,
+ Address: "tcp://backup-2",
+ Storage: "praefect-internal-3",
+ }},
+ Whitelist: []string{repo1.RelativePath, repo2.RelativePath},
+ })
- expectSecondaries := []models.GitalyServer{
- models.GitalyServer{Name: cfg.SecondaryServers[0].Name},
- models.GitalyServer{Name: cfg.SecondaryServers[1].Name},
+ expectReplicas := []models.Node{
+ mds.storageNodes.m[1],
+ mds.storageNodes.m[2],
}
for _, repo := range []models.Repository{repo1, repo2} {
- actualSecondaries, err := mds.GetShardSecondaries(repo)
+ actualReplicas, err := mds.GetReplicas(repo.RelativePath)
require.NoError(t, err)
- require.ElementsMatch(t, expectSecondaries, actualSecondaries)
+ require.ElementsMatch(t, expectReplicas, actualReplicas)
}
- backup1 := cfg.SecondaryServers[0]
- backup2 := cfg.SecondaryServers[1]
+ primary := mds.storageNodes.m[0]
+ backup1 := mds.storageNodes.m[1]
+ backup2 := mds.storageNodes.m[2]
- backup1ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
- ID: 1,
- Target: backup1.Name,
- Source: repo1,
- State: praefect.JobStateReady,
+ backup1ExpectedJobs := []ReplJob{
+ ReplJob{
+ ID: 1,
+ TargetNode: backup1,
+ Repository: models.Repository{RelativePath: repo1.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
+ SourceNode: primary,
+ State: JobStateReady,
},
- praefect.ReplJob{
- ID: 3,
- Target: backup1.Name,
- Source: repo2,
- State: praefect.JobStateReady,
+ ReplJob{
+ ID: 3,
+ TargetNode: backup1,
+ Repository: models.Repository{RelativePath: repo2.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
+ SourceNode: primary,
+ State: JobStateReady,
},
}
- backup2ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
- ID: 2,
- Target: backup2.Name,
- Source: repo1,
- State: praefect.JobStateReady,
+ backup2ExpectedJobs := []ReplJob{
+ ReplJob{
+ ID: 2,
+ TargetNode: backup2,
+ Repository: models.Repository{RelativePath: repo1.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
+ SourceNode: primary,
+ State: JobStateReady,
},
- praefect.ReplJob{
- ID: 4,
- Target: backup2.Name,
- Source: repo2,
- State: praefect.JobStateReady,
+ ReplJob{
+ ID: 4,
+ TargetNode: backup2,
+ Repository: models.Repository{RelativePath: repo2.RelativePath, Primary: primary, Replicas: []models.Node{backup1, backup2}},
+ SourceNode: primary,
+ State: JobStateReady,
},
}
- backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10)
+ backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10)
require.NoError(t, err)
require.Equal(t, backup1ExpectedJobs, backup1ActualJobs)
- backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10)
+ backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10)
require.NoError(t, err)
require.Equal(t, backup2ActualJobs, backup2ExpectedJobs)
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 417a04be2..d661d8d21 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -1,95 +1,106 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
-const (
- stor1 = "default" // usually the primary storage location
- stor2 = "backup-1" // usually the seoncary storage location
+var (
+ stor1 = models.Node{
+ ID: 0,
+ Address: "tcp://address-1",
+ Storage: "praefect-storage-1",
+ }
+ stor2 = models.Node{
+ ID: 1,
+ Address: "tcp://address-2",
+ Storage: "praefect-storage-2",
+ }
proj1 = "abcd1234" // imagine this is a legit project hash
)
var (
- repo1Primary = models.Repository{
+ repo1Repository = models.Repository{
RelativePath: proj1,
- Storage: stor1,
}
)
var operations = []struct {
desc string
- opFn func(*testing.T, praefect.Datastore)
+ opFn func(*testing.T, Datastore)
}{
{
desc: "query an empty datastore",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
},
{
desc: "insert first replication job before secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- _, err := ds.CreateSecondaryReplJobs(repo1Primary)
- require.Error(t, err, praefect.ErrInvalidReplTarget)
+ opFn: func(t *testing.T, ds Datastore) {
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
+ require.Error(t, err, ErrInvalidReplTarget)
},
},
{
- desc: "set the primary for the shard",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1})
+ desc: "set the primary for the repository",
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID)
require.NoError(t, err)
},
},
{
- desc: "associate the replication job target with a primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}})
+ desc: "add a secondary replica for the repository",
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID)
require.NoError(t, err)
},
},
{
desc: "insert first replication job after secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- ids, err := ds.CreateSecondaryReplJobs(repo1Primary)
+ opFn: func(t *testing.T, ds Datastore) {
+ ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
require.NoError(t, err)
require.Equal(t, []uint64{1}, ids)
},
},
{
desc: "fetch inserted replication jobs after primary mapped",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- expectedJob := praefect.ReplJob{
- ID: 1,
- Source: repo1Primary,
- Target: stor2,
- State: praefect.JobStatePending,
+ expectedJob := ReplJob{
+ ID: 1,
+ Repository: models.Repository{
+ RelativePath: repo1Repository.RelativePath,
+ Primary: stor1,
+ Replicas: []models.Node{stor2},
+ },
+ SourceNode: stor1,
+ TargetNode: stor2,
+ State: JobStatePending,
}
require.Equal(t, expectedJob, jobs[0])
},
},
{
desc: "mark replication job done",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.UpdateReplJob(1, praefect.JobStateComplete)
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.UpdateReplJob(1, JobStateComplete)
require.NoError(t, err)
},
},
{
desc: "try fetching completed replication job",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
@@ -97,14 +108,11 @@ var operations = []struct {
}
// TODO: add SQL datastore flavor
-var flavors = map[string]func() praefect.Datastore{
- "in-memory-datastore": func() praefect.Datastore {
- return praefect.NewMemoryDatastore(
- config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- })
+var flavors = map[string]func() Datastore{
+ "in-memory-datastore": func() Datastore {
+ return NewMemoryDatastore(config.Config{
+ Nodes: []*models.Node{&stor1, &stor2},
+ })
},
}
diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go
index 83910d099..03a58459a 100644
--- a/internal/praefect/mock/mock.pb.go
+++ b/internal/praefect/mock/mock.pb.go
@@ -11,6 +11,8 @@ import (
proto "github.com/golang/protobuf/proto"
_ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
)
// Reference imports to suppress errors if they are not otherwise used.
@@ -110,17 +112,17 @@ func init() {
func init() { proto.RegisterFile("mock.proto", fileDescriptor_6fa4806c90f7156d) }
var fileDescriptor_6fa4806c90f7156d = []byte{
- // 152 bytes of a gzipped FileDescriptorProto
+ // 154 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xca, 0xcd, 0x4f, 0xce,
0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0xa5, 0x78, 0x8a, 0x33, 0x12, 0x8b,
0x52, 0x53, 0x20, 0x62, 0x4a, 0xaa, 0x5c, 0xbc, 0xc1, 0x99, 0xb9, 0x05, 0x39, 0xa9, 0x41, 0xa9,
0x85, 0xa5, 0xa9, 0xc5, 0x25, 0x42, 0x22, 0x5c, 0xac, 0x65, 0x89, 0x39, 0xa5, 0xa9, 0x12, 0x8c,
0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x10, 0x8e, 0x92, 0x1a, 0x17, 0x1f, 0x4c, 0x59, 0x71, 0x41, 0x7e,
- 0x5e, 0x71, 0x2a, 0x42, 0x1d, 0x13, 0x92, 0x3a, 0xa3, 0x30, 0x98, 0x71, 0xc1, 0xa9, 0x45, 0x65,
- 0x99, 0xc9, 0xa9, 0x42, 0xae, 0x5c, 0x02, 0x10, 0x81, 0xd0, 0xbc, 0xc4, 0xa2, 0x4a, 0x30, 0x21,
- 0x24, 0xac, 0x07, 0x76, 0x14, 0x8a, 0xbd, 0x52, 0x22, 0xa8, 0x82, 0x10, 0x5b, 0x94, 0xd8, 0x7e,
- 0x4d, 0xd7, 0x60, 0xe2, 0x60, 0x4a, 0x62, 0x03, 0xbb, 0xd6, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff,
- 0x55, 0x3d, 0x80, 0x35, 0xcf, 0x00, 0x00, 0x00,
+ 0x5e, 0x71, 0x2a, 0x42, 0x1d, 0x13, 0x92, 0x3a, 0xa3, 0x08, 0x98, 0x71, 0xc1, 0xa9, 0x45, 0x65,
+ 0x99, 0xc9, 0xa9, 0x42, 0xee, 0x5c, 0x02, 0x10, 0x81, 0xd0, 0xbc, 0xc4, 0xa2, 0x4a, 0x30, 0x21,
+ 0x24, 0xac, 0x07, 0x76, 0x14, 0x8a, 0xbd, 0x52, 0x22, 0xa8, 0x82, 0x10, 0x5b, 0x94, 0x38, 0x7e,
+ 0x4d, 0xd7, 0x60, 0xe1, 0x60, 0x12, 0x60, 0x4c, 0x62, 0x03, 0xbb, 0xd7, 0x18, 0x10, 0x00, 0x00,
+ 0xff, 0xff, 0xe4, 0x1b, 0xb4, 0x1f, 0xd1, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -162,6 +164,14 @@ type SimpleServiceServer interface {
SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error)
}
+// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations.
+type UnimplementedSimpleServiceServer struct {
+}
+
+func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented")
+}
+
func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) {
s.RegisterService(&_SimpleService_serviceDesc, srv)
}
diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto
index ded233102..59e79d3b9 100644
--- a/internal/praefect/mock/mock.proto
+++ b/internal/praefect/mock/mock.proto
@@ -19,7 +19,10 @@ message SimpleResponse {
service SimpleService {
// SimpleUnaryUnary is a simple unary request with unary response
rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) {
- option (gitaly.op_type).op = ACCESSOR;
+ option (gitaly.op_type) = {
+ op: ACCESSOR
+ scope_level: SERVER
+ };
}
}
diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go
index 3b542f8ad..60e12595c 100644
--- a/internal/praefect/mocksvc_test.go
+++ b/internal/praefect/mocksvc_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go
new file mode 100644
index 000000000..941a72e8f
--- /dev/null
+++ b/internal/praefect/models/node.go
@@ -0,0 +1,17 @@
+package models
+
+// Node describes an address that serves a storage
+type Node struct {
+ ID int
+ Storage string `toml:"storage"`
+ Address string `toml:"address"`
+ Token string `toml:"token"`
+}
+
+// Repository describes a repository's relative path and its primary and list of secondaries
+type Repository struct {
+ ID int
+ RelativePath string
+ Primary Node
+ Replicas []Node
+}
diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go
deleted file mode 100644
index 854254d87..000000000
--- a/internal/praefect/models/nodes.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package models
-
-// GitalyServer allows configuring the servers that RPCs are proxied to
-type GitalyServer struct {
- Name string `toml:"name"`
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- Token string `toml:"token"`
-}
diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go
deleted file mode 100644
index e11cdbf0a..000000000
--- a/internal/praefect/models/repository.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package models
-
-// Repository provides all necessary information to address a repository hosted
-// in a specific Gitaly replica
-type Repository struct {
- RelativePath string `toml:"relative_path"` // relative path of repository
- Storage string `toml:"storage"` // storage location, e.g. default
-}
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 363e023be..21c5254ff 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -207,11 +207,6 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo,
opCode = OpUnknown
}
- targetRepo, err := parseOID(opMsg.GetTargetRepositoryField())
- if err != nil {
- return MethodInfo{}, err
- }
-
// for some reason, the protobuf descriptor contains an extra dot in front
// of the request name that the generated code does not. This trimming keeps
// the two copies consistent for comparisons.
@@ -227,13 +222,22 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo,
return MethodInfo{}, fmt.Errorf("encountered unknown method scope %d", opMsg.GetScopeLevel())
}
- return MethodInfo{
+ mi := MethodInfo{
Operation: opCode,
Scope: scope,
- targetRepo: targetRepo,
requestName: requestName,
requestFactory: reqFactory,
- }, nil
+ }
+
+ if scope == ScopeRepository {
+ targetRepo, err := parseOID(opMsg.GetTargetRepositoryField())
+ if err != nil {
+ return MethodInfo{}, err
+ }
+ mi.targetRepo = targetRepo
+ }
+
+ return mi, nil
}
// parses a string like "1.1" and returns a slice of ints
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index c56f0488c..ebba87641 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -8,32 +8,33 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
"github.com/sirupsen/logrus"
)
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, source models.Repository, target Node) error
+ Replicate(ctx context.Context, job ReplJob, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Logger
}
-func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, job ReplJob, targetCC *grpc.ClientConn) error {
repository := &gitalypb.Repository{
- StorageName: target.Storage,
- RelativePath: source.RelativePath,
+ StorageName: job.TargetNode.Storage,
+ RelativePath: job.Repository.RelativePath,
}
remoteRepository := &gitalypb.Repository{
- StorageName: source.Storage,
- RelativePath: source.RelativePath,
+ StorageName: job.SourceNode.Storage,
+ RelativePath: job.Repository.RelativePath,
}
- repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc)
- remoteClient := gitalypb.NewRemoteServiceClient(target.cc)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
+ remoteClient := gitalypb.NewRemoteServiceClient(targetCC)
// CreateRepository is idempotent
if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
@@ -60,7 +61,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Logger
- dataStore Datastore
+ datastore Datastore
coordinator *Coordinator
targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
@@ -74,10 +75,10 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Logger, datastore Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
- dataStore: ds,
+ datastore: datastore,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
targetNode: targetNode,
@@ -118,7 +119,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
return nil
}
- id, err := r.dataStore.CreateSecondaryReplJobs(repo)
+ id, err := r.datastore.CreateReplicaReplJobs(repo.RelativePath)
if err != nil {
return err
}
@@ -140,58 +141,56 @@ const (
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
for {
- jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10)
+ nodes, err := r.datastore.GetStorageNodes()
if err != nil {
- return err
+ return nil
}
- if len(jobs) == 0 {
- r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval)
-
- select {
- // TODO: exponential backoff when no queries are returned
- case <-time.After(jobFetchInterval):
- continue
-
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- r.log.Debugf("fetched replication jobs: %#v", jobs)
-
- for _, job := range jobs {
- r.log.WithField(logWithReplJobID, job.ID).
- Infof("processing replication job %#v", job)
- node, err := r.coordinator.GetStorageNode(job.Target)
- r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err)
+ for _, node := range nodes {
+ jobs, err := r.datastore.GetJobs(JobStatePending|JobStateReady, node.ID, 10)
if err != nil {
return err
}
- if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
- return err
- }
+ if len(jobs) == 0 {
+ r.log.Tracef("no jobs for %d, checking again in %s", node.ID, jobFetchInterval)
- primary, err := r.dataStore.GetShardPrimary(job.Source)
- if err != nil {
- return err
- }
+ select {
+ // TODO: exponential backoff when no queries are returned
+ case <-time.After(jobFetchInterval):
+ continue
- ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token)
- if err != nil {
- return err
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
- if err := r.replicator.Replicate(ctx, job.Source, node); err != nil {
- r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
- return err
- }
+ for _, job := range jobs {
+ r.log.WithField(logWithReplJobID, job.ID).
+ Infof("processing replication job %#v", job)
- r.log.WithField(logWithReplJobID, job.ID).
- Info("completed replication")
- if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
- return err
+ if err := r.datastore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ return err
+ }
+
+ ctx, err = helper.InjectGitalyServers(ctx, job.Repository.Primary.Storage, job.SourceNode.Address, "")
+ if err != nil {
+ return err
+ }
+
+ cc, err := r.coordinator.GetConnection(job.TargetNode.Storage)
+ if err != nil {
+ return err
+ }
+
+ if err := r.replicator.Replicate(ctx, job, cc); err != nil {
+ r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
+ return err
+ }
+
+ if err := r.datastore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ return err
+ }
}
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 1294bc989..b669e6875 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -28,43 +28,32 @@ import (
// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for
// all whitelisted repos
func TestReplicatorProcessJobsWhitelist(t *testing.T) {
- var (
- cfg = config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup1",
- ListenAddr: "tcp://gitaly-backup1.example.com",
- },
- {
- Name: "backup2",
- ListenAddr: "tcp://gitaly-backup2.example.com",
- },
- },
- Whitelist: []string{"abcd1234", "edfg5678"},
- }
- datastore = NewMemoryDatastore(cfg)
- coordinator = NewCoordinator(logrus.New(), datastore)
- resultsCh = make(chan result)
- replman = NewReplMgr(
- cfg.SecondaryServers[1].Name,
- logrus.New(),
- datastore,
- coordinator,
- WithWhitelist(cfg.Whitelist),
- WithReplicator(&mockReplicator{resultsCh}),
- )
+ datastore := NewMemoryDatastore(config.Config{
+ Nodes: []*models.Node{
+ &models.Node{
+ ID: 1,
+ Address: "tcp://gitaly-primary.example.com",
+ Storage: "praefect-internal-1",
+ }, &models.Node{
+ ID: 2,
+ Address: "tcp://gitaly-backup1.example.com",
+ Storage: "praefect-internal-2",
+ }},
+ Whitelist: []string{"abcd1234", "edfg5678"},
+ })
+
+ coordinator := NewCoordinator(logrus.New(), datastore)
+ resultsCh := make(chan result)
+ replman := NewReplMgr(
+ "default",
+ logrus.New(),
+ datastore,
+ coordinator,
+ WithReplicator(&mockReplicator{resultsCh}),
)
- for _, node := range []*models.GitalyServer{
- cfg.PrimaryServer,
- cfg.SecondaryServers[0],
- cfg.SecondaryServers[1],
- } {
- err := coordinator.RegisterNode(node.Name, node.ListenAddr)
+ for _, node := range datastore.storageNodes.m {
+ err := coordinator.RegisterNode(node.Storage, node.Address)
require.NoError(t, err)
}
@@ -78,14 +67,27 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
success := make(chan struct{})
+ var expectedResults []result
+ // we expect one job per whitelisted repo with each backend server
+ for _, shard := range datastore.repositories.m {
+ for _, secondary := range shard.Replicas {
+ cc, err := coordinator.GetConnection(secondary.Storage)
+ require.NoError(t, err)
+ expectedResults = append(expectedResults,
+ result{relativePath: shard.RelativePath,
+ targetStorage: secondary.Storage,
+ targetCC: cc,
+ })
+ }
+ }
+
go func() {
// we expect one job per whitelisted repo with each backend server
- for i := 0; i < len(cfg.Whitelist); i++ {
- result := <-resultsCh
-
- assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
- assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage)
- assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage)
+ for _, shard := range datastore.repositories.m {
+ for range shard.Replicas {
+ result := <-resultsCh
+ assert.Contains(t, expectedResults, result)
+ }
}
cancel()
@@ -106,18 +108,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
}
type result struct {
- source models.Repository
- target Node
+ relativePath string
+ targetStorage string
+ targetCC *grpc.ClientConn
}
type mockReplicator struct {
resultsCh chan<- result
}
-func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (mr *mockReplicator) Replicate(ctx context.Context, job ReplJob, target *grpc.ClientConn) error {
select {
- case mr.resultsCh <- result{source, target}:
+ case mr.resultsCh <- result{job.Repository.RelativePath, job.TargetNode.Storage, target}:
return nil
case <-ctx.Done():
@@ -178,11 +181,19 @@ func TestReplicate(t *testing.T) {
var replicator defaultReplicator
require.NoError(t, replicator.Replicate(
ctx,
- models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()},
- Node{
- cc: conn,
- Storage: backupStorageName,
- }))
+ ReplJob{
+ Repository: models.Repository{
+ RelativePath: testRepo.GetRelativePath(),
+ },
+ SourceNode: models.Node{
+ Storage: "default",
+ },
+ TargetNode: models.Node{
+ Storage: backupStorageName,
+ },
+ },
+ conn,
+ ))
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 915d7281a..5f231071b 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
@@ -7,14 +7,15 @@ import (
"testing"
"time"
+ "github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"google.golang.org/grpc"
)
@@ -44,26 +45,48 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
},
}
+ gz := proto.FileDescriptor("mock.proto")
+ fd, err := protoregistry.ExtractFileDescriptor(gz)
+ if err != nil {
+ panic(err)
+ }
+
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
const (
storagePrimary = "default"
- storageBackup = "backup"
)
- datastore := praefect.NewMemoryDatastore(config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
+ datastore := NewMemoryDatastore(config.Config{
+ Nodes: []*models.Node{
+ &models.Node{
+ ID: 1,
+ Storage: "praefect-internal-1",
+ },
+ &models.Node{
+ ID: 2,
+ Storage: "praefect-internal-2",
+ }},
})
- coordinator := praefect.NewCoordinator(logrus.New(), datastore)
- replmgr := praefect.NewReplMgr(
+
+ coordinator := NewCoordinator(logrus.New(), datastore, fd)
+
+ for id, nodeStorage := range datastore.storageNodes.m {
+ backend, cleanup := newMockDownstream(t, tt.callback)
+ defer cleanup() // clean up mock downstream server resources
+
+ coordinator.RegisterNode(nodeStorage.Storage, backend)
+ nodeStorage.Address = backend
+ datastore.storageNodes.m[id] = nodeStorage
+ }
+
+ replmgr := NewReplMgr(
storagePrimary,
logrus.New(),
datastore,
coordinator,
)
- prf := praefect.NewServer(
+ prf := NewServer(
coordinator,
replmgr,
nil,
@@ -85,13 +108,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
defer cc.Close()
cli := mock.NewSimpleServiceClient(cc)
- for _, replica := range []string{storagePrimary, storageBackup} {
- backend, cleanup := newMockDownstream(t, tt.callback)
- defer cleanup() // clean up mock downstream server resources
-
- coordinator.RegisterNode(replica, backend)
- }
-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()