Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-07-18 01:35:11 +0300
committerJohn Cai <jcai@gitlab.com>2019-08-01 23:03:28 +0300
commit220a0d62a891c390b586d82437a8b5e3cf3a6183 (patch)
tree54bd1fcaa5c9447c3fdbcdd8f90580bbcd727e91
parent1ab045ca8712df495fd7bafb544893ed04ea44a7 (diff)
Updating data model for praefectjc-data-model-changes
-rw-r--r--.gitignore1
-rw-r--r--_support/praefect-cluster/.gitignore6
-rw-r--r--_support/praefect-cluster/config.gitaly.toml (renamed from _support/praefect-cluster/gitaly-primary.toml)0
-rw-r--r--_support/praefect-cluster/config.praefect.toml23
-rw-r--r--_support/praefect-cluster/docker-compose.yml24
-rw-r--r--_support/praefect-cluster/gitaly-backup-1.toml49
-rw-r--r--_support/praefect-cluster/gitaly-backup-2.toml49
-rw-r--r--changelogs/unreleased/jc-sql-data-store.yml5
-rw-r--r--cmd/praefect/main.go16
-rw-r--r--internal/helper/storage.go11
-rw-r--r--internal/praefect/common.go8
-rw-r--r--internal/praefect/config/config.go28
-rw-r--r--internal/praefect/config/config_test.go34
-rw-r--r--internal/praefect/config/testdata/config.toml24
-rw-r--r--internal/praefect/coordinator.go178
-rw-r--r--internal/praefect/coordinator_test.go31
-rw-r--r--internal/praefect/datastore.go365
-rw-r--r--internal/praefect/datastore_memory_test.go113
-rw-r--r--internal/praefect/datastore_test.go86
-rw-r--r--internal/praefect/mock/mock.pb.go30
-rw-r--r--internal/praefect/mock/mock.proto7
-rw-r--r--internal/praefect/mocksvc_test.go4
-rw-r--r--internal/praefect/models/node.go17
-rw-r--r--internal/praefect/models/nodes.go8
-rw-r--r--internal/praefect/models/repository.go8
-rw-r--r--internal/praefect/protoregistry/protoregistry.go37
-rw-r--r--internal/praefect/protoregistry/targetrepo_test.go2
-rw-r--r--internal/praefect/replicator.go112
-rw-r--r--internal/praefect/replicator_test.go104
-rw-r--r--internal/praefect/server_test.go51
30 files changed, 707 insertions, 724 deletions
diff --git a/.gitignore b/.gitignore
index b7fbb0b4c..9fb2c7e44 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,6 +21,7 @@ cmd/gitaly-remote/gitaly-remote
git-env
/gitaly-debug
/praefect
+/praefect-migrate
gitaly.pid
/vendor/github.com/libgit2/git2go/vendor
/vendor
diff --git a/_support/praefect-cluster/.gitignore b/_support/praefect-cluster/.gitignore
index 06b873206..bd035c2b7 100644
--- a/_support/praefect-cluster/.gitignore
+++ b/_support/praefect-cluster/.gitignore
@@ -1,3 +1,3 @@
-/gitaly-backup-1
-/gitaly-backup-2
-/gitaly-primary
+/gitaly-1
+/gitaly-2
+/gitaly-3
diff --git a/_support/praefect-cluster/gitaly-primary.toml b/_support/praefect-cluster/config.gitaly.toml
index 2379b6951..2379b6951 100644
--- a/_support/praefect-cluster/gitaly-primary.toml
+++ b/_support/praefect-cluster/config.gitaly.toml
diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml
index e0f163178..2a4297248 100644
--- a/_support/praefect-cluster/config.praefect.toml
+++ b/_support/praefect-cluster/config.praefect.toml
@@ -24,17 +24,22 @@ whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90e
# as shard. listen_addr should be unique for all nodes.
# Requires the protocol to be defined, e.g. tcp://host.tld:1234
-[primary_server]
- name = "default"
+[[storage_node]]
# listen_addr = "tcp://gitaly-primary:9999"
- listen_addr = "tcp://127.0.0.1:9999"
+ storage = "praefect-internal-1"
+ address = "tcp://127.0.0.1:9999"
-[[secondary_server]]
- name = "backup1"
+[[storage_node]]
# listen_addr = "tcp://gitaly-backup-1:9999"
- listen_addr = "tcp://127.0.0.1:9998"
+ storage = "praefect-internal-2"
+ address = "tcp://127.0.0.1:9998"
-[[secondary_server]]
- name = "backup2"
+[[storage_node]]
# listen_addr = "tcp://gitaly-backup-2:9999"
- listen_addr = "tcp://127.0.0.1:9997" \ No newline at end of file
+ storage = "praefect-internal-3"
+ address = "tcp://127.0.0.1:9997"
+
+[postgres]
+ user = "johncai"
+ address = "127.0.0.1:5432"
+ database = "praefect_test" \ No newline at end of file
diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml
index 6eb81be47..09745ea41 100644
--- a/_support/praefect-cluster/docker-compose.yml
+++ b/_support/praefect-cluster/docker-compose.yml
@@ -6,15 +6,15 @@ services:
# dockerfile: Dockerfile.praefect
# image: praefect:latest
# depends_on:
-# - gitaly-primary
-# - gitaly-backup-1
-# - gitaly-backup-2
+# - gitaly-1
+# - gitaly-2
+# - gitaly-3
# command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"]
# ports:
# - "2305:2305"
# volumes:
# - ./config.praefect.toml:/etc/gitaly/config.praefect.toml
- gitaly-primary:
+ gitaly-1:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
environment:
- GITALY_TESTING_NO_GIT_HOOKS=1
@@ -24,9 +24,9 @@ services:
- "9999:9999"
command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-primary/data:/home/git/repositories
- - ./gitaly-primary.toml:/etc/config/config.toml
- gitaly-backup-1:
+ - ./gitaly-1/data:/home/git/repositories
+ - ./config.gitaly.toml:/etc/config/config.toml
+ gitaly-2:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
environment:
- GITALY_TESTING_NO_GIT_HOOKS=1
@@ -36,9 +36,9 @@ services:
- "9998:9999"
command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-backup-1/data:/home/git/repositories
- - ./gitaly-backup-1.toml:/etc/config/config.toml
- gitaly-backup-2:
+ - ./gitaly-2/data:/home/git/repositories
+ - ./config.gitaly.toml:/etc/config/config.toml
+ gitaly-3:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
environment:
- GITALY_TESTING_NO_GIT_HOOKS=1
@@ -48,5 +48,5 @@ services:
- "9997:9999"
command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-backup-2/data:/home/git/repositories
- - ./gitaly-backup-2.toml:/etc/config/config.toml \ No newline at end of file
+ - ./gitaly-3/data:/home/git/repositories
+ - ./config.gitaly.toml:/etc/config/config.toml \ No newline at end of file
diff --git a/_support/praefect-cluster/gitaly-backup-1.toml b/_support/praefect-cluster/gitaly-backup-1.toml
deleted file mode 100644
index 89d1884e3..000000000
--- a/_support/praefect-cluster/gitaly-backup-1.toml
+++ /dev/null
@@ -1,49 +0,0 @@
-# Example Gitaly configuration file
-
-# The directory where Gitaly's executables are stored
-bin_dir = "/usr/local/bin"
-
-# listen on a TCP socket. This is insecure (no authentication)
-listen_addr = "0.0.0.0:9999"
-
-# # Optional: export metrics via Prometheus
-# prometheus_listen_addr = "localhost:9236"
-#
-
-# # Git executable settings
-# [git]
-# bin_path = "/usr/bin/git"
-
-[[storage]]
-name = "backup1"
-path = "/home/git/repositories"
-
-# # You can optionally configure more storages for this Gitaly instance to serve up
-#
-# [[storage]]
-# name = "other_storage"
-# path = "/mnt/other_storage/repositories"
-#
-
-# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout
-# [logging]
-# format = "json"
-# # Additionally exceptions can be reported to Sentry
-# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>"
-
-# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
-# [prometheus]
-# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
-
-[gitaly-ruby]
-# The directory where gitaly-ruby is installed
-dir = "/srv/gitaly-ruby"
-
-[gitlab-shell]
-# The directory where gitlab-shell is installed
-dir = "/srv/gitlab-shell"
-
-# # You can adjust the concurrency of each RPC endpoint
-# [[concurrency]]
-# rpc = "/gitaly.RepositoryService/GarbageCollect"
-# max_per_repo = 1
diff --git a/_support/praefect-cluster/gitaly-backup-2.toml b/_support/praefect-cluster/gitaly-backup-2.toml
deleted file mode 100644
index 1b5ce8d20..000000000
--- a/_support/praefect-cluster/gitaly-backup-2.toml
+++ /dev/null
@@ -1,49 +0,0 @@
-# Example Gitaly configuration file
-
-# The directory where Gitaly's executables are stored
-bin_dir = "/usr/local/bin"
-
-# listen on a TCP socket. This is insecure (no authentication)
-listen_addr = "0.0.0.0:9999"
-
-# # Optional: export metrics via Prometheus
-# prometheus_listen_addr = "localhost:9236"
-#
-
-# # Git executable settings
-# [git]
-# bin_path = "/usr/bin/git"
-
-[[storage]]
-name = "backup2"
-path = "/home/git/repositories"
-
-# # You can optionally configure more storages for this Gitaly instance to serve up
-#
-# [[storage]]
-# name = "other_storage"
-# path = "/mnt/other_storage/repositories"
-#
-
-# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout
-# [logging]
-# format = "json"
-# # Additionally exceptions can be reported to Sentry
-# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>"
-
-# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
-# [prometheus]
-# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
-
-[gitaly-ruby]
-# The directory where gitaly-ruby is installed
-dir = "/srv/gitaly-ruby"
-
-[gitlab-shell]
-# The directory where gitlab-shell is installed
-dir = "/srv/gitlab-shell"
-
-# # You can adjust the concurrency of each RPC endpoint
-# [[concurrency]]
-# rpc = "/gitaly.RepositoryService/GarbageCollect"
-# max_per_repo = 1
diff --git a/changelogs/unreleased/jc-sql-data-store.yml b/changelogs/unreleased/jc-sql-data-store.yml
new file mode 100644
index 000000000..e9ccb210d
--- /dev/null
+++ b/changelogs/unreleased/jc-sql-data-store.yml
@@ -0,0 +1,5 @@
+---
+title: SQL datastore for praefect
+merge_request: 1370
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index acf53d2fa..8ce68530d 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -19,6 +19,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/labkit/tracing"
)
@@ -95,10 +96,9 @@ func run(listeners []net.Listener, conf config.Config) error {
var (
// top level server dependencies
datastore = praefect.NewMemoryDatastore(conf)
- coordinator = praefect.NewCoordinator(logger, datastore)
- repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
+ coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...)
+ repl = praefect.NewReplMgr("default", logger, datastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
srv = praefect.NewServer(coordinator, repl, nil, logger)
-
// signal related
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
termCh = make(chan os.Signal, len(signals))
@@ -114,14 +114,12 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer)
-
- for _, gitaly := range allBackendServers {
- if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil {
- return fmt.Errorf("failed to register %s: %s", gitaly.Name, err)
+ for _, node := range conf.StorageNodes {
+ if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil {
+ return fmt.Errorf("failed to register %s: %s", node.Address, err)
}
- logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node")
+ logger.WithField("node_address", node.Address).Info("registered gitaly node")
}
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
diff --git a/internal/helper/storage.go b/internal/helper/storage.go
index 4e535a5d6..f3f0d0ba0 100644
--- a/internal/helper/storage.go
+++ b/internal/helper/storage.go
@@ -34,6 +34,17 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly
return
}
+// IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata
+func IncomingToOutgoing(ctx context.Context) context.Context {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return ctx
+ }
+
+ return metadata.NewOutgoingContext(ctx, md)
+
+}
+
// InjectGitalyServers injects gitaly-servers metadata into an outgoing context
func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) {
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
index 2df2a4823..a09a292ad 100644
--- a/internal/praefect/common.go
+++ b/internal/praefect/common.go
@@ -1,13 +1,5 @@
package praefect
-import "google.golang.org/grpc"
-
-// Node is a wrapper around the grpc client connection for a backend Gitaly node
-type Node struct {
- Storage string
- cc *grpc.ClientConn
-}
-
// logging keys to use with logrus WithField
const (
logKeyProjectPath = "ProjectPath"
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 6a2a5b5d5..eb0fad56b 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -15,8 +15,7 @@ type Config struct {
ListenAddr string `toml:"listen_addr"`
SocketPath string `toml:"socket_path"`
- PrimaryServer *models.GitalyServer `toml:"primary_server"`
- SecondaryServers []*models.GitalyServer `toml:"secondary_server"`
+ StorageNodes []*models.StorageNode `toml:"storage_node"`
// Whitelist is a list of relative project paths (paths comprised of project
// hashes) that are permitted to use high availability features
@@ -26,13 +25,6 @@ type Config struct {
PrometheusListenAddr string `toml:"prometheus_listen_addr"`
}
-// GitalyServer allows configuring the servers that RPCs are proxied to
-type GitalyServer struct {
- Name string `toml:"name"`
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- Token string `toml:"token"`
-}
-
// FromFile loads the config for the passed file path
func FromFile(filePath string) (Config, error) {
config := &Config{}
@@ -50,32 +42,30 @@ var (
errNoListener = errors.New("no listen address or socket path configured")
errNoGitalyServers = errors.New("no primary gitaly backends configured")
errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique")
- errGitalyWithoutName = errors.New("all gitaly servers must have a name")
+ errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address")
)
-var emptyServer = &models.GitalyServer{}
-
// Validate establishes if the config is valid
func (c Config) Validate() error {
if c.ListenAddr == "" && c.SocketPath == "" {
return errNoListener
}
- if c.PrimaryServer == nil || c.PrimaryServer == emptyServer {
+ if len(c.StorageNodes) == 0 {
return errNoGitalyServers
}
- listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1)
- for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) {
- if gitaly.Name == "" {
- return errGitalyWithoutName
+ listenAddrs := make(map[string]bool, len(c.StorageNodes))
+ for _, node := range c.StorageNodes {
+ if node.Address == "" {
+ return errGitalyWithoutAddr
}
- if _, found := listenAddrs[gitaly.ListenAddr]; found {
+ if _, found := listenAddrs[node.Address]; found {
return errDuplicateGitalyAddr
}
- listenAddrs[gitaly.ListenAddr] = true
+ listenAddrs[node.Address] = true
}
return nil
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index eace5eb2f..b89bdd648 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -9,10 +9,10 @@ import (
)
func TestConfigValidation(t *testing.T) {
- primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"}
- secondarySrvs := []*models.GitalyServer{
- {"test1", "localhost:23457", "secret-token"},
- {"test2", "localhost:23458", "secret-token"},
+ nodes := []*models.StorageNode{
+ {ID: 1, Address: "localhost:23456", Token: "secret-token"},
+ {ID: 2, Address: "localhost:23457", Token: "secret-token"},
+ {ID: 3, Address: "localhost:23458", Token: "secret-token"},
}
testCases := []struct {
@@ -22,12 +22,12 @@ func TestConfigValidation(t *testing.T) {
}{
{
desc: "No ListenAddr or SocketPath",
- config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "", StorageNodes: nodes},
err: errNoListener,
},
{
desc: "Only a SocketPath",
- config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes},
err: nil,
},
{
@@ -37,12 +37,12 @@ func TestConfigValidation(t *testing.T) {
},
{
desc: "duplicate address",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}},
+ config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address})},
err: errDuplicateGitalyAddr,
},
{
desc: "Valid config",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes},
err: nil,
},
}
@@ -63,18 +63,18 @@ func TestConfigParsing(t *testing.T) {
{
filePath: "testdata/config.toml",
expected: Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*models.GitalyServer{
+ StorageNodes: []*models.StorageNode{
+ {
+ Address: "tcp://gitaly-internal-1.example.com",
+ Storage: "praefect-internal-1",
+ },
{
- Name: "default",
- ListenAddr: "tcp://gitaly-backup1.example.com",
+ Address: "tcp://gitaly-internal-2.example.com",
+ Storage: "praefect-internal-2",
},
{
- Name: "backup",
- ListenAddr: "tcp://gitaly-backup2.example.com",
+ Address: "tcp://gitaly-internal-3.example.com",
+ Storage: "praefect-internal-3",
},
},
Whitelist: []string{"abcd1234", "edfg5678"},
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 81701a359..defebcca9 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -3,20 +3,20 @@ socket_path = ""
whitelist = ["abcd1234", "edfg5678"]
prometheus_listen_addr = ""
-[primary_server]
- name = "default"
- listen_addr = "tcp://gitaly-primary.example.com"
-
-[[secondary_server]]
- name = "default"
- listen_addr = "tcp://gitaly-backup1.example.com"
-
-[[secondary_server]]
- name = "backup"
- listen_addr = "tcp://gitaly-backup2.example.com"
-
[logging]
format = ""
sentry_dsn = ""
ruby_sentry_dsn = ""
level = ""
+
+[[storage_node]]
+ address = "tcp://gitaly-internal-1.example.com"
+ storage = "praefect-internal-1"
+
+[[storage_node]]
+ address = "tcp://gitaly-internal-2.example.com"
+ storage = "praefect-internal-2"
+
+[[storage_node]]
+ address = "tcp://gitaly-internal-3.example.com"
+ storage = "praefect-internal-3"
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index c238604f3..f00a469b6 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -2,11 +2,14 @@ package praefect
import (
"context"
+ "errors"
"fmt"
+ "math/rand"
"os"
"os/signal"
"sync"
"syscall"
+ "time"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
@@ -19,8 +22,6 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
// Coordinator takes care of directing client requests to the appropriate
@@ -31,14 +32,14 @@ type Coordinator struct {
failoverMutex sync.RWMutex
connMutex sync.RWMutex
- datastore Datastore
+ datastore ReplicasDatastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -55,19 +56,6 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
-// GetStorageNode returns the registered node for the given storage location
-func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
- cc, ok := c.getConn(storage)
- if !ok {
- return Node{}, fmt.Errorf("no node registered for storage location %q", storage)
- }
-
- return Node{
- Storage: storage,
- cc: cc,
- }, nil
-}
-
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) {
// For phase 1, we need to route messages based on the storage location
@@ -77,34 +65,93 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
c.failoverMutex.RLock()
defer c.failoverMutex.RUnlock()
- serverConfig, err := c.datastore.GetDefaultPrimary()
+ frame, err := peeker.Peek()
if err != nil {
- err := status.Error(
- codes.FailedPrecondition,
- "no downstream node registered",
- )
return nil, nil, err
}
- // We only need the primary node, as there's only one primary storage
- // location per praefect at this time
- cc, ok := c.getConn(serverConfig.Name)
- if !ok {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name)
+ mi, err := c.registry.LookupMethod(fullMethodName)
+ if err != nil {
+ return nil, nil, err
}
- ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token)
+ var primary *models.StorageNode
+
+ if mi.Scope == protoregistry.ScopeRepository {
+ m, err := mi.UnmarshalRequestProto(frame)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ targetRepo, err := mi.TargetRepo(m)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
+
+ if err != nil {
+ if err != ErrPrimaryNotSet {
+ return nil, nil, err
+ }
+ // if there are no primaries for this repository, pick one
+ nodes, err := c.datastore.GetStorageNodes()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if len(nodes) == 0 {
+ return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
+
+ }
+ newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))]
+ //newPrimary := nodes[0]
+
+ // set the primary
+ if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
+ return nil, nil, err
+ }
+
+ primary = &newPrimary
+ }
+
+ targetRepo.StorageName = primary.Storage
+
+ b, err := proxy.Codec().Marshal(m)
+ if err != nil {
+ return nil, nil, err
+ }
+ if err = peeker.Modify(b); err != nil {
+ return nil, nil, err
+ }
+
+ } else {
+ //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
+ // proxy requests that are not repository scoped
+ node, err := c.datastore.GetStorageNodes()
+ if err != nil {
+ return nil, nil, err
+ }
+ if len(node) == 0 {
+ return nil, nil, errors.New("no node storages found")
+ }
+ primary = &node[0]
+ }
+
+ // We only need the primary node, as there's only one primary storage
+ // location per praefect at this time
+ cc, err := c.GetConnection(primary.Storage)
if err != nil {
- return nil, nil, err
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage)
}
- return ctx, cc, nil
+ return helper.IncomingToOutgoing(ctx), cc, nil
}
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
- conn, err := client.Dial(listenAddr,
+func (c *Coordinator) RegisterNode(storage, address string) error {
+ conn, err := client.Dial(address,
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)),
@@ -114,23 +161,28 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
return err
}
- c.setConn(storageName, conn)
+ c.setConn(storage, conn)
return nil
}
-func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
+func (c *Coordinator) setConn(storage string, conn *grpc.ClientConn) {
c.connMutex.Lock()
- c.nodes[storageName] = conn
+ c.nodes[storage] = conn
c.connMutex.Unlock()
}
-func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) {
+// GetConnection gets the grpc client connection based on an address
+func (c *Coordinator) GetConnection(storage string) (*grpc.ClientConn, error) {
c.connMutex.RLock()
- cc, ok := c.nodes[storageName]
+ cc, ok := c.nodes[storage]
c.connMutex.RUnlock()
+ if !ok {
+ return nil, errors.New("client connection not found")
+ }
+
+ return cc, nil
- return cc, ok
}
// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
@@ -146,55 +198,7 @@ func (c *Coordinator) handleSignalAndRotate() {
<-failoverChan
c.failoverMutex.Lock()
- primary, err := c.datastore.GetDefaultPrimary()
- if err != nil {
- c.log.Fatalf("error when getting default primary: %v", err)
- }
-
- if err := c.rotateSecondaryToPrimary(primary); err != nil {
- c.log.WithError(err).Error("rotating secondary")
- }
+ c.log.Info("failover happens")
c.failoverMutex.Unlock()
}
}
-
-func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error {
- repositories, err := c.datastore.GetRepositoriesForPrimary(primary)
- if err != nil {
- return err
- }
-
- for _, repoPath := range repositories {
- secondaries, err := c.datastore.GetShardSecondaries(models.Repository{
- RelativePath: repoPath,
- })
- if err != nil {
- return fmt.Errorf("getting secondaries: %v", err)
- }
-
- newPrimary := secondaries[0]
- secondaries = append(secondaries[1:], primary)
-
- if err = c.datastore.SetShardPrimary(models.Repository{
- RelativePath: repoPath,
- }, newPrimary); err != nil {
- return fmt.Errorf("setting primary: %v", err)
- }
-
- if err = c.datastore.SetShardSecondaries(models.Repository{
- RelativePath: repoPath,
- }, secondaries); err != nil {
- return fmt.Errorf("setting secondaries: %v", err)
- }
- }
-
- // set the new default primary
- primary, err = c.datastore.GetShardPrimary(models.Repository{
- RelativePath: repositories[0],
- })
- if err != nil {
- return fmt.Errorf("getting shard primary: %v", err)
- }
-
- return c.datastore.SetDefaultPrimary(primary)
-}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 50045f8a0..0275c6048 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,9 +5,6 @@ import (
"testing"
"github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
var testLogger = logrus.New()
@@ -17,31 +14,5 @@ func init() {
}
func TestSecondaryRotation(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{Name: "primary"},
- SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}},
- Whitelist: []string{"/repoA", "/repoB"},
- }
- d := NewMemoryDatastore(cfg)
- c := NewCoordinator(testLogger, d)
-
- primary, err := d.GetDefaultPrimary()
- require.NoError(t, err)
-
- require.NoError(t, c.rotateSecondaryToPrimary(primary))
-
- primary, err = d.GetDefaultPrimary()
- require.NoError(t, err)
- require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary")
-
- repositories, err := d.GetRepositoriesForPrimary(primary)
- require.NoError(t, err)
-
- for _, repository := range repositories {
- shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository})
- require.NoError(t, err)
-
- require.Len(t, shardSecondaries, 2)
- require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0])
- }
+ t.Skip("secondary rotation will change with the new data model")
}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 5678c6a24..85fa22a23 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -34,7 +34,7 @@ const (
// JobStateComplete indicates the job is now complete
JobStateComplete
// JobStateCancelled indicates the job was cancelled. This can occur if the
- // job is no longer relevant (e.g. a node is moved out of a shard)
+ // job is no longer relevant (e.g. a node is moved out of a repository)
JobStateCancelled
)
@@ -42,10 +42,11 @@ const (
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
- ID uint64 // autoincrement ID
- Target string // which storage location to replicate to?
- Source models.Repository // source for replication
- State JobState
+ ID uint64 // autoincrement ID
+ TargetNodeID int // which node to replicate to?
+ SourceStorage string
+ Source models.Repository // source for replication
+ State JobState
}
// replJobs provides sort manipulation behavior
@@ -64,32 +65,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I
type Datastore interface {
ReplJobsDatastore
ReplicasDatastore
- TemporaryDatastore
-}
-
-// TemporaryDatastore contains methods that will go away once we move to a SQL datastore
-type TemporaryDatastore interface {
- GetDefaultPrimary() (models.GitalyServer, error)
- SetDefaultPrimary(primary models.GitalyServer) error
}
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- // GetSecondaries will retrieve all secondary replica storage locations for
- // a primary replica
- GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error)
+ GetReplicas(relativePath string) ([]models.StorageNode, error)
+
+ GetStorageNode(nodeID int) (models.StorageNode, error)
+
+ GetStorageNodes() ([]models.StorageNode, error)
+
+ GetPrimary(relativePath string) (*models.StorageNode, error)
- GetShardPrimary(repo models.Repository) (models.GitalyServer, error)
+ SetPrimary(relativePath string, storageNodeID int) error
- // SetSecondaries will set the secondary storage locations for a repository
- // in a primary replica.
- SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error
+ AddReplica(relativePath string, storageNodeID int) error
- SetShardPrimary(repo models.Repository, primary models.GitalyServer) error
+ RemoveReplica(relativePath string, storageNodeID int) error
- // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary
- GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error)
+ GetRepository(relativePath string) (*models.Repository, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -98,58 +93,53 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, node string, count int) ([]ReplJob, error)
+ GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error)
- // CreateSecondaryJobs will create replication jobs for each secondary
+ // CreateReplicaJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateSecondaryReplJobs(source models.Repository) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
}
-// shard is a set of primary and secondary storage replicas for a project
-type shard struct {
- primary models.GitalyServer
- secondaries []models.GitalyServer
-}
-
type jobRecord struct {
- relativePath string // project's relative path
- targetNode string
- state JobState
+ sourceStorage string
+ relativePath string // project's relative path
+ targetNodeID int
+ state JobState
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
type MemoryDatastore struct {
- replicas *struct {
- sync.RWMutex
- m map[string]shard // keyed by project's relative path
- }
-
jobs *struct {
sync.RWMutex
next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
- primary *struct {
+ storageNodes *struct {
sync.RWMutex
- server models.GitalyServer
+ m map[int]models.StorageNode
+ }
+
+ repositories *struct {
+ sync.RWMutex
+ m map[string]models.Repository
}
}
// NewMemoryDatastore returns an initialized in-memory datastore
func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
m := &MemoryDatastore{
- replicas: &struct {
+ storageNodes: &struct {
sync.RWMutex
- m map[string]shard
+ m map[int]models.StorageNode
}{
- m: map[string]shard{},
+ m: map[int]models.StorageNode{},
},
jobs: &struct {
sync.RWMutex
@@ -159,114 +149,190 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
- primary: &struct {
+ repositories: &struct {
sync.RWMutex
- server models.GitalyServer
+ m map[string]models.Repository
}{
- server: models.GitalyServer{
- Name: cfg.PrimaryServer.Name,
- ListenAddr: cfg.PrimaryServer.ListenAddr,
- Token: cfg.PrimaryServer.Token,
- },
+ m: map[string]models.Repository{},
},
}
- secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers))
- for i, server := range cfg.SecondaryServers {
- secondaryServers[i] = *server
+ for i, storageNode := range cfg.StorageNodes {
+ storageNode.ID = i
+ m.storageNodes.m[i] = *storageNode
}
- for _, repo := range cfg.Whitelist {
- // store the configuration file specified shard
- m.replicas.m[repo] = shard{
- primary: *cfg.PrimaryServer,
- secondaries: secondaryServers,
+ for _, repoPath := range cfg.Whitelist {
+ repo := models.Repository{
+ RelativePath: repoPath,
}
-
- // initialize replication job queue to replicate all whitelisted repos
- // to every secondary server
- for _, secondary := range cfg.SecondaryServers {
- m.jobs.next++
- m.jobs.records[m.jobs.next] = jobRecord{
- state: JobStateReady,
- targetNode: secondary.Name,
- relativePath: repo,
+ for storageID, storageNode := range cfg.StorageNodes {
+
+ // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node
+ // to be the primary
+ if repo.Primary == (models.StorageNode{}) {
+ repo.Primary = *storageNode
+ } else {
+ repo.Replicas = append(repo.Replicas, *storageNode)
+ // initialize replication job queue to replicate all whitelisted repos
+ // to every replica
+ m.jobs.next++
+ m.jobs.records[m.jobs.next] = jobRecord{
+ state: JobStateReady,
+ targetNodeID: storageID,
+ sourceStorage: repo.Primary.Storage,
+ relativePath: repoPath,
+ }
}
}
+ m.repositories.m[repoPath] = repo
}
return m
}
-// GetShardSecondaries will return the set of secondary storage locations for a
-// given repository if they exist
-func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) {
- shard, _ := md.getShard(primary.RelativePath)
+// GetReplicas gets the secondaries for a repository based on the relative path
+func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) {
+ md.repositories.RLock()
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
+ defer md.repositories.RUnlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, errors.New("repository not found")
+ }
+
+ return repository.Replicas, nil
+}
+
+// GetStorageNode gets all storage nodes
+func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
- return shard.secondaries, nil
+ node, ok := md.storageNodes.m[nodeID]
+ if !ok {
+ return models.StorageNode{}, errors.New("node not found")
+ }
+
+ return node, nil
}
-// SetShardSecondaries will replace the set of replicas for a repository
-func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// GetStorageNodes gets all storage nodes
+func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
- shard := md.replicas.m[repo.RelativePath]
- shard.secondaries = secondaries
- md.replicas.m[repo.RelativePath] = shard
+ var storageNodes []models.StorageNode
+ for _, storageNode := range md.storageNodes.m {
+ storageNodes = append(storageNodes, storageNode)
+ }
- return nil
+ return storageNodes, nil
}
-// SetShardPrimary sets the primary for a repository
-func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// GetPrimary gets the primary storage node for a repository of a repository relative path
+func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) {
+ md.repositories.RLock()
+ defer md.repositories.RUnlock()
- shard := md.replicas.m[repo.RelativePath]
- shard.primary = primary
- md.replicas.m[repo.RelativePath] = shard
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, ErrPrimaryNotSet
+ }
+ storageNode, ok := md.storageNodes.m[repository.Primary.ID]
+ if !ok {
+ return nil, errors.New("node storage not found")
+ }
+ return &storageNode, nil
+
+}
+
+// SetPrimary sets the primary storagee node for a repository of a repository relative path
+func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ repository = models.Repository{RelativePath: relativePath}
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
+
+ repository.Primary = storageNode
+
+ md.repositories.m[relativePath] = repository
return nil
}
-// GetShardPrimary gets the primary for a repository
-func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// AddReplica adds a secondary to a repository of a repository relative path
+func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return errors.New("repository not found")
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
- shard := md.replicas.m[repo.RelativePath]
- return shard.primary, nil
+ repository.Replicas = append(repository.Replicas, storageNode)
+
+ md.repositories.m[relativePath] = repository
+ return nil
}
-// GetRepositoriesForPrimary gets all repositories
-func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// RemoveReplica removes a secondary from a repository of a repository relative path
+func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- repositories := make([]string, 0, len(md.replicas.m))
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return errors.New("repository not found")
+ }
- for repository := range md.replicas.m {
- repositories = append(repositories, repository)
+ var secondaries []models.StorageNode
+ for _, secondary := range repository.Replicas {
+ if secondary.ID != storageNodeID {
+ secondaries = append(secondaries, secondary)
+ }
}
- return repositories, nil
+ repository.Replicas = secondaries
+ md.repositories.m[relativePath] = repository
+ return nil
}
-func (md *MemoryDatastore) getShard(project string) (shard, bool) {
- md.replicas.RLock()
- replicas, ok := md.replicas.m[project]
- md.replicas.RUnlock()
+// GetRepository gets the repository for a repository relative path
+func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- return replicas, ok
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, errors.New("repository not found")
+ }
+
+ return &repository, nil
}
-// ErrSecondariesMissing indicates the repository does not have any backup
+// ErrReplicasMissing indicates the repository does not have any backup
// replicas
-var ErrSecondariesMissing = errors.New("repository missing secondary replicas")
+var ErrReplicasMissing = errors.New("repository missing secondary replicas")
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) {
+func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) {
md.jobs.RLock()
defer md.jobs.RUnlock()
@@ -274,7 +340,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.targetNode == storage {
+ if record.state&state != 0 && record.targetNodeID == targetNodeID {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -293,60 +359,52 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
}
// replJobFromRecord constructs a replication job from a record and by cross
-// referencing the current shard for the project being replicated
+// referencing the current repository for the project being replicated
func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- shard, ok := md.getShard(record.relativePath)
- if !ok {
- return ReplJob{}, fmt.Errorf(
- "unable to find shard for project at relative path %q",
- record.relativePath,
- )
- }
-
return ReplJob{
ID: jobID,
Source: models.Repository{
RelativePath: record.relativePath,
- Storage: shard.primary.Name,
},
- State: record.state,
- Target: record.targetNode,
+ SourceStorage: record.sourceStorage,
+ State: record.state,
+ TargetNodeID: record.targetNodeID,
}, nil
}
-// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because
+// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because
// it fails preconditions for being replicatable
-var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication")
+var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication")
-// CreateSecondaryReplJobs creates a replication job for each secondary that
+// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
- emptyRepo := models.Repository{}
- if source == emptyRepo {
+ if relativePath == "" {
return nil, errors.New("invalid source repository")
}
- shard, ok := md.getShard(source.RelativePath)
- if !ok {
+ repository, err := md.GetRepository(relativePath)
+ if err != nil {
return nil, fmt.Errorf(
- "unable to find shard for project at relative path %q",
- source.RelativePath,
+ "unable to find repository for project at relative path %q",
+ relativePath,
)
}
var jobIDs []uint64
- for _, secondary := range shard.secondaries {
+ for _, secondary := range repository.Replicas {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- targetNode: secondary.Name,
- state: JobStatePending,
- relativePath: source.RelativePath,
+ targetNodeID: secondary.ID,
+ state: JobStatePending,
+ relativePath: relativePath,
+ sourceStorage: repository.Primary.Storage,
}
jobIDs = append(jobIDs, nextID)
@@ -375,36 +433,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
-
-// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error {
- md.primary.Lock()
- defer md.primary.Unlock()
-
- md.primary.server = primary
-
- return nil
-}
-
-// GetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- primary := md.primary.server
- if primary == (models.GitalyServer{}) {
- return primary, ErrPrimaryNotSet
- }
-
- return primary, nil
-}
-
-// SetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- md.primary.server = primary
-
- return nil
-}
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
index 6099a8328..a618d9466 100644
--- a/internal/praefect/datastore_memory_test.go
+++ b/internal/praefect/datastore_memory_test.go
@@ -1,93 +1,94 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will
-// populate itself with the correct replication jobs and shards when initialized
+// populate itself with the correct replication jobs and repositories when initialized
// with a configuration file specifying the shard and whitelisted repositories.
func TestMemoryDatastoreWhitelist(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup-1",
- },
- {
- Name: "backup-2",
- },
- },
- Whitelist: []string{"abcd1234", "5678efgh"},
- }
-
- mds := praefect.NewMemoryDatastore(cfg)
-
repo1 := models.Repository{
- RelativePath: cfg.Whitelist[0],
- Storage: cfg.PrimaryServer.Name,
+ RelativePath: "abcd1234",
}
-
repo2 := models.Repository{
- RelativePath: cfg.Whitelist[1],
- Storage: cfg.PrimaryServer.Name,
+ RelativePath: "5678efgh",
}
+ mds := NewMemoryDatastore(config.Config{
+ StorageNodes: []*models.StorageNode{
+ &models.StorageNode{
+ ID: 0,
+ Address: "tcp://default",
+ Storage: "praefect-internal-1",
+ },
+ &models.StorageNode{
+ ID: 1,
+ Address: "tcp://backup-2",
+ Storage: "praefect-internal-2",
+ }, &models.StorageNode{
+ ID: 2,
+ Address: "tcp://backup-2",
+ Storage: "praefect-internal-3",
+ }},
+ Whitelist: []string{repo1.RelativePath, repo2.RelativePath},
+ })
- expectSecondaries := []models.GitalyServer{
- models.GitalyServer{Name: cfg.SecondaryServers[0].Name},
- models.GitalyServer{Name: cfg.SecondaryServers[1].Name},
+ expectReplicas := []models.StorageNode{
+ mds.storageNodes.m[1],
+ mds.storageNodes.m[2],
}
for _, repo := range []models.Repository{repo1, repo2} {
- actualSecondaries, err := mds.GetShardSecondaries(repo)
+ actualReplicas, err := mds.GetReplicas(repo.RelativePath)
require.NoError(t, err)
- require.ElementsMatch(t, expectSecondaries, actualSecondaries)
+ require.ElementsMatch(t, expectReplicas, actualReplicas)
}
- backup1 := cfg.SecondaryServers[0]
- backup2 := cfg.SecondaryServers[1]
+ backup1 := mds.storageNodes.m[1]
+ backup2 := mds.storageNodes.m[2]
- backup1ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
- ID: 1,
- Target: backup1.Name,
- Source: repo1,
- State: praefect.JobStateReady,
+ backup1ExpectedJobs := []ReplJob{
+ ReplJob{
+ ID: 1,
+ TargetNodeID: backup1.ID,
+ Source: models.Repository{RelativePath: repo1.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStateReady,
},
- praefect.ReplJob{
- ID: 3,
- Target: backup1.Name,
- Source: repo2,
- State: praefect.JobStateReady,
+ ReplJob{
+ ID: 3,
+ TargetNodeID: backup1.ID,
+ Source: models.Repository{RelativePath: repo2.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStateReady,
},
}
- backup2ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
- ID: 2,
- Target: backup2.Name,
- Source: repo1,
- State: praefect.JobStateReady,
+ backup2ExpectedJobs := []ReplJob{
+ ReplJob{
+ ID: 2,
+ TargetNodeID: backup2.ID,
+ Source: models.Repository{RelativePath: repo1.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStateReady,
},
- praefect.ReplJob{
- ID: 4,
- Target: backup2.Name,
- Source: repo2,
- State: praefect.JobStateReady,
+ ReplJob{
+ ID: 4,
+ TargetNodeID: backup2.ID,
+ Source: models.Repository{RelativePath: repo2.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStateReady,
},
}
- backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10)
+ backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10)
require.NoError(t, err)
require.Equal(t, backup1ExpectedJobs, backup1ActualJobs)
- backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10)
+ backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10)
require.NoError(t, err)
require.Equal(t, backup2ActualJobs, backup2ExpectedJobs)
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 417a04be2..654d5ea7e 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -1,95 +1,104 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
-const (
- stor1 = "default" // usually the primary storage location
- stor2 = "backup-1" // usually the seoncary storage location
+var (
+ stor1 = models.StorageNode{
+ ID: 0,
+ Address: "tcp://address-1",
+ Storage: "praefect-storage-1",
+ }
+ stor2 = models.StorageNode{
+ ID: 1,
+ Address: "tcp://address-2",
+ Storage: "praefect-storage-2",
+ }
proj1 = "abcd1234" // imagine this is a legit project hash
)
var (
- repo1Primary = models.Repository{
+ repo1Repository = models.Repository{
RelativePath: proj1,
- Storage: stor1,
}
)
var operations = []struct {
desc string
- opFn func(*testing.T, praefect.Datastore)
+ opFn func(*testing.T, Datastore)
}{
{
desc: "query an empty datastore",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
},
{
desc: "insert first replication job before secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- _, err := ds.CreateSecondaryReplJobs(repo1Primary)
- require.Error(t, err, praefect.ErrInvalidReplTarget)
+ opFn: func(t *testing.T, ds Datastore) {
+ _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
+ require.Error(t, err, ErrInvalidReplTarget)
},
},
{
- desc: "set the primary for the shard",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1})
+ desc: "set the primary for the repository",
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID)
require.NoError(t, err)
},
},
{
- desc: "associate the replication job target with a primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}})
+ desc: "add a secondary replica for the repository",
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID)
require.NoError(t, err)
},
},
{
desc: "insert first replication job after secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- ids, err := ds.CreateSecondaryReplJobs(repo1Primary)
+ opFn: func(t *testing.T, ds Datastore) {
+ ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
require.NoError(t, err)
require.Equal(t, []uint64{1}, ids)
},
},
{
desc: "fetch inserted replication jobs after primary mapped",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- expectedJob := praefect.ReplJob{
- ID: 1,
- Source: repo1Primary,
- Target: stor2,
- State: praefect.JobStatePending,
+ expectedJob := ReplJob{
+ ID: 1,
+ Source: models.Repository{
+ RelativePath: repo1Repository.RelativePath,
+ },
+ SourceStorage: "praefect-storage-1",
+ TargetNodeID: stor2.ID,
+ State: JobStatePending,
}
require.Equal(t, expectedJob, jobs[0])
},
},
{
desc: "mark replication job done",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.UpdateReplJob(1, praefect.JobStateComplete)
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.UpdateReplJob(1, JobStateComplete)
require.NoError(t, err)
},
},
{
desc: "try fetching completed replication job",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
@@ -97,14 +106,11 @@ var operations = []struct {
}
// TODO: add SQL datastore flavor
-var flavors = map[string]func() praefect.Datastore{
- "in-memory-datastore": func() praefect.Datastore {
- return praefect.NewMemoryDatastore(
- config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- })
+var flavors = map[string]func() Datastore{
+ "in-memory-datastore": func() Datastore {
+ return NewMemoryDatastore(config.Config{
+ StorageNodes: []*models.StorageNode{&stor1, &stor2},
+ })
},
}
diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go
index b8a8afb01..7324cbcaf 100644
--- a/internal/praefect/mock/mock.pb.go
+++ b/internal/praefect/mock/mock.pb.go
@@ -9,7 +9,10 @@ import (
math "math"
proto "github.com/golang/protobuf/proto"
+ _ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
)
// Reference imports to suppress errors if they are not otherwise used.
@@ -109,16 +112,17 @@ func init() {
func init() { proto.RegisterFile("mock/mock.proto", fileDescriptor_5ed43251284e3118) }
var fileDescriptor_5ed43251284e3118 = []byte{
- // 139 bytes of a gzipped FileDescriptorProto
+ // 157 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xcd, 0x4f, 0xce,
- 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x92, 0x2a, 0x17,
- 0x6f, 0x70, 0x66, 0x6e, 0x41, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x08,
- 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x84, 0xa3,
- 0xa4, 0xc6, 0xc5, 0x07, 0x53, 0x56, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0x50, 0xc7, 0x84, 0xa4,
- 0xce, 0x28, 0x00, 0x66, 0x5c, 0x70, 0x6a, 0x51, 0x59, 0x66, 0x72, 0xaa, 0x90, 0x3d, 0x97, 0x00,
- 0x44, 0x20, 0x34, 0x2f, 0xb1, 0xa8, 0x12, 0x4c, 0x08, 0x09, 0xeb, 0x81, 0x9d, 0x81, 0x62, 0xaf,
- 0x94, 0x08, 0xaa, 0x20, 0xc4, 0x16, 0x25, 0x86, 0x24, 0x36, 0xb0, 0x6b, 0x8d, 0x01, 0x01, 0x00,
- 0x00, 0xff, 0xff, 0xb7, 0xeb, 0x46, 0xfb, 0xc0, 0x00, 0x00, 0x00,
+ 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x14, 0x4f, 0x71,
+ 0x46, 0x62, 0x51, 0x6a, 0x0a, 0x44, 0x4c, 0x49, 0x95, 0x8b, 0x37, 0x38, 0x33, 0xb7, 0x20, 0x27,
+ 0x35, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34,
+ 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x35, 0x08, 0xc2, 0x51, 0x52, 0xe3, 0xe2, 0x83, 0x29, 0x2b,
+ 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x45, 0xa8, 0x63, 0x42, 0x52, 0x67, 0x14, 0x01, 0x33, 0x2e, 0x38,
+ 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0xc8, 0x9d, 0x4b, 0x00, 0x22, 0x10, 0x9a, 0x97, 0x58, 0x54,
+ 0x09, 0x26, 0x84, 0x84, 0xf5, 0xc0, 0x8e, 0x42, 0xb1, 0x57, 0x4a, 0x04, 0x55, 0x10, 0x62, 0x8b,
+ 0x12, 0xc7, 0xaf, 0xe9, 0x1a, 0x2c, 0x1c, 0x4c, 0x02, 0x8c, 0x49, 0x6c, 0x60, 0xf7, 0x1a, 0x03,
+ 0x02, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x14, 0x6a, 0x14, 0xd6, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -160,6 +164,14 @@ type SimpleServiceServer interface {
SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error)
}
+// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations.
+type UnimplementedSimpleServiceServer struct {
+}
+
+func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented")
+}
+
func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) {
s.RegisterService(&_SimpleService_serviceDesc, srv)
}
diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto
index aa6ec842a..59e79d3b9 100644
--- a/internal/praefect/mock/mock.proto
+++ b/internal/praefect/mock/mock.proto
@@ -7,6 +7,8 @@ syntax = "proto3";
package mock;
+import "shared.proto";
+
message SimpleRequest {
int32 value = 1;
}
@@ -17,7 +19,10 @@ message SimpleResponse {
service SimpleService {
// SimpleUnaryUnary is a simple unary request with unary response
rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) {
- option (gitaly.op_type).op = ACCESSOR;
+ option (gitaly.op_type) = {
+ op: ACCESSOR
+ scope_level: SERVER
+ };
}
}
diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go
index f6e01811b..adcf7a65e 100644
--- a/internal/praefect/mocksvc_test.go
+++ b/internal/praefect/mocksvc_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
@@ -11,7 +11,7 @@ type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock.
// mockSvc is an implementation of mock.SimpleServer for testing purposes. The
// gRPC stub can be updated via go generate:
//
-//go:generate protoc --go_out=plugins=grpc:. mock/mock.proto
+//go:generate protoc --go_out=plugins=grpc:. -I../../proto -I./ mock/mock.proto
//go:generate goimports -w mock/mock.pb.go
type mockSvc struct {
simpleUnaryUnary simpleUnaryUnaryCallback
diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go
new file mode 100644
index 000000000..db3224c4c
--- /dev/null
+++ b/internal/praefect/models/node.go
@@ -0,0 +1,17 @@
+package models
+
+// StorageNode describes an address that serves a storage
+type StorageNode struct {
+ ID int
+ Storage string `toml:"storage"`
+ Address string `toml:"address"`
+ Token string `toml:"token"`
+}
+
+// Repository describes a repository's relative path and its primary and list of secondaries
+type Repository struct {
+ ID int
+ RelativePath string
+ Primary StorageNode
+ Replicas []StorageNode
+}
diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go
deleted file mode 100644
index 854254d87..000000000
--- a/internal/praefect/models/nodes.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package models
-
-// GitalyServer allows configuring the servers that RPCs are proxied to
-type GitalyServer struct {
- Name string `toml:"name"`
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- Token string `toml:"token"`
-}
diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go
deleted file mode 100644
index e11cdbf0a..000000000
--- a/internal/praefect/models/repository.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package models
-
-// Repository provides all necessary information to address a repository hosted
-// in a specific Gitaly replica
-type Repository struct {
- RelativePath string `toml:"relative_path"` // relative path of repository
- Storage string `toml:"storage"` // storage location, e.g. default
-}
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 811b56140..770f3ddd0 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -43,11 +43,24 @@ const (
OpMutator
)
+// Scope represents the scope for an RPC method
+type Scope int
+
+const (
+ // ScopeRepository = repository scope
+ ScopeRepository = iota
+ // ScopeStorage = storage scope
+ ScopeStorage
+ // ScopeServer = serer scope
+ ScopeServer
+)
+
// MethodInfo contains metadata about the RPC method. Refer to documentation
// for message type "OperationMsg" shared.proto in gitlab-org/gitaly-proto for
// more documentation.
type MethodInfo struct {
Operation OpType
+ Scope Scope
targetRepo []int
requestName string // protobuf message name for input type
requestFactory protoFactory
@@ -55,13 +68,6 @@ type MethodInfo struct {
// TargetRepo returns the target repository for a protobuf message if it exists
func (mi MethodInfo) TargetRepo(msg proto.Message) (*gitalypb.Repository, error) {
- if mi.requestName != proto.MessageName(msg) {
- return nil, fmt.Errorf(
- "proto message %s does not match expected RPC request message %s",
- proto.MessageName(msg), mi.requestName,
- )
- }
-
return reflectFindRepoTarget(msg, mi.targetRepo)
}
@@ -179,6 +185,11 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo,
return MethodInfo{}, err
}
+ scope, err := parseScope(opMsg.GetScopeLevel())
+ if err != nil {
+ return MethodInfo{}, err
+ }
+
// for some reason, the protobuf descriptor contains an extra dot in front
// of the request name that the generated code does not. This trimming keeps
// the two copies consistent for comparisons.
@@ -194,9 +205,21 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo,
targetRepo: targetRepo,
requestName: requestName,
requestFactory: reqFactory,
+ Scope: scope,
}, nil
}
+func parseScope(scope gitalypb.OperationMsg_Scope) (Scope, error) {
+ switch scope {
+ case gitalypb.OperationMsg_REPOSITORY:
+ return ScopeRepository, nil
+ case gitalypb.OperationMsg_SERVER:
+ return ScopeServer, nil
+ }
+
+ return ScopeRepository, errors.New("scope not found")
+}
+
// parses a string like "1.1" and returns a slice of ints
func parseOID(rawFieldOID string) ([]int, error) {
var fieldNos []int
diff --git a/internal/praefect/protoregistry/targetrepo_test.go b/internal/praefect/protoregistry/targetrepo_test.go
index 8d6629524..286ebcf41 100644
--- a/internal/praefect/protoregistry/targetrepo_test.go
+++ b/internal/praefect/protoregistry/targetrepo_test.go
@@ -56,7 +56,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) {
svc: "RepositoryService",
method: "RepackIncremental",
pbMsg: &gitalypb.RepackIncrementalResponse{},
- expectErr: errors.New("proto message gitaly.RepackIncrementalResponse does not match expected RPC request message gitaly.RepackIncrementalRequest"),
+ expectErr: errors.New("unable to descend OID [1] into message gitaly.RepackIncrementalResponse: unable to find protobuf field 1 in message RepackIncrementalResponse"),
},
{
desc: "target nested in oneOf",
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index c56f0488c..d873fee46 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -8,32 +8,33 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
"github.com/sirupsen/logrus"
)
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, source models.Repository, target Node) error
+ Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Logger
}
-func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error {
repository := &gitalypb.Repository{
- StorageName: target.Storage,
+ StorageName: targetStorage,
RelativePath: source.RelativePath,
}
remoteRepository := &gitalypb.Repository{
- StorageName: source.Storage,
+ StorageName: sourceStorage,
RelativePath: source.RelativePath,
}
- repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc)
- remoteClient := gitalypb.NewRemoteServiceClient(target.cc)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(target)
+ remoteClient := gitalypb.NewRemoteServiceClient(target)
// CreateRepository is idempotent
if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
@@ -60,7 +61,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Logger
- dataStore Datastore
+ replicasDS ReplicasDatastore
+ replJobsDS ReplJobsDatastore
coordinator *Coordinator
targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
@@ -74,10 +76,11 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Logger, replicasDS ReplicasDatastore, jobsDS ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
- dataStore: ds,
+ replicasDS: replicasDS,
+ replJobsDS: jobsDS,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
targetNode: targetNode,
@@ -118,7 +121,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
return nil
}
- id, err := r.dataStore.CreateSecondaryReplJobs(repo)
+ id, err := r.replJobsDS.CreateReplicaReplJobs(repo.RelativePath)
if err != nil {
return err
}
@@ -140,58 +143,65 @@ const (
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
for {
- jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10)
+ nodes, err := r.replicasDS.GetStorageNodes()
if err != nil {
- return err
+ return nil
}
- if len(jobs) == 0 {
- r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval)
-
- select {
- // TODO: exponential backoff when no queries are returned
- case <-time.After(jobFetchInterval):
- continue
-
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- r.log.Debugf("fetched replication jobs: %#v", jobs)
-
- for _, job := range jobs {
- r.log.WithField(logWithReplJobID, job.ID).
- Infof("processing replication job %#v", job)
- node, err := r.coordinator.GetStorageNode(job.Target)
- r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err)
+ for _, node := range nodes {
+ jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.ID, 10)
if err != nil {
return err
}
- if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
- return err
- }
+ if len(jobs) == 0 {
+ r.log.Tracef("no jobs for %d, checking again in %s", node.ID, jobFetchInterval)
- primary, err := r.dataStore.GetShardPrimary(job.Source)
- if err != nil {
- return err
- }
-
- ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token)
- if err != nil {
- return err
- }
+ select {
+ // TODO: exponential backoff when no queries are returned
+ case <-time.After(jobFetchInterval):
+ continue
- if err := r.replicator.Replicate(ctx, job.Source, node); err != nil {
- r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
- return err
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
- r.log.WithField(logWithReplJobID, job.ID).
- Info("completed replication")
- if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
- return err
+ for _, job := range jobs {
+ r.log.WithField(logWithReplJobID, job.ID).
+ Infof("processing replication job %#v", job)
+ node, err := r.replicasDS.GetStorageNode(job.TargetNodeID)
+ if err != nil {
+ return err
+ }
+
+ repository, err := r.replicasDS.GetRepository(job.Source.RelativePath)
+ if err != nil {
+ return err
+ }
+
+ if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ return err
+ }
+
+ ctx, err = helper.InjectGitalyServers(ctx, job.SourceStorage, repository.Primary.Address, "")
+ if err != nil {
+ return err
+ }
+
+ cc, err := r.coordinator.GetConnection(node.Storage)
+ if err != nil {
+ return err
+ }
+
+ if err := r.replicator.Replicate(ctx, job.Source, job.SourceStorage, node.Storage, cc); err != nil {
+ r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
+ return err
+ }
+
+ if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ return err
+ }
}
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 1294bc989..b3a461539 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -28,43 +28,33 @@ import (
// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for
// all whitelisted repos
func TestReplicatorProcessJobsWhitelist(t *testing.T) {
- var (
- cfg = config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup1",
- ListenAddr: "tcp://gitaly-backup1.example.com",
- },
- {
- Name: "backup2",
- ListenAddr: "tcp://gitaly-backup2.example.com",
- },
- },
- Whitelist: []string{"abcd1234", "edfg5678"},
- }
- datastore = NewMemoryDatastore(cfg)
- coordinator = NewCoordinator(logrus.New(), datastore)
- resultsCh = make(chan result)
- replman = NewReplMgr(
- cfg.SecondaryServers[1].Name,
- logrus.New(),
- datastore,
- coordinator,
- WithWhitelist(cfg.Whitelist),
- WithReplicator(&mockReplicator{resultsCh}),
- )
+ datastore := NewMemoryDatastore(config.Config{
+ StorageNodes: []*models.StorageNode{
+ &models.StorageNode{
+ ID: 1,
+ Address: "tcp://gitaly-primary.example.com",
+ Storage: "praefect-internal-1",
+ }, &models.StorageNode{
+ ID: 2,
+ Address: "tcp://gitaly-backup1.example.com",
+ Storage: "praefect-internal-2",
+ }},
+ Whitelist: []string{"abcd1234", "edfg5678"},
+ })
+
+ coordinator := NewCoordinator(logrus.New(), datastore)
+ resultsCh := make(chan result)
+ replman := NewReplMgr(
+ "default",
+ logrus.New(),
+ datastore,
+ datastore,
+ coordinator,
+ WithReplicator(&mockReplicator{resultsCh}),
)
- for _, node := range []*models.GitalyServer{
- cfg.PrimaryServer,
- cfg.SecondaryServers[0],
- cfg.SecondaryServers[1],
- } {
- err := coordinator.RegisterNode(node.Name, node.ListenAddr)
+ for _, node := range datastore.storageNodes.m {
+ err := coordinator.RegisterNode(node.Storage, node.Address)
require.NoError(t, err)
}
@@ -78,14 +68,27 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
success := make(chan struct{})
+ var expectedResults []result
+ // we expect one job per whitelisted repo with each backend server
+ for _, shard := range datastore.repositories.m {
+ for _, secondary := range shard.Replicas {
+ cc, err := coordinator.GetConnection(secondary.Storage)
+ require.NoError(t, err)
+ expectedResults = append(expectedResults,
+ result{source: models.Repository{RelativePath: shard.RelativePath},
+ targetStorage: secondary.Storage,
+ targetCC: cc,
+ })
+ }
+ }
+
go func() {
// we expect one job per whitelisted repo with each backend server
- for i := 0; i < len(cfg.Whitelist); i++ {
- result := <-resultsCh
-
- assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
- assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage)
- assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage)
+ for _, shard := range datastore.repositories.m {
+ for range shard.Replicas {
+ result := <-resultsCh
+ assert.Contains(t, expectedResults, result)
+ }
}
cancel()
@@ -106,18 +109,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
}
type result struct {
- source models.Repository
- target Node
+ source models.Repository
+ targetStorage string
+ targetCC *grpc.ClientConn
}
type mockReplicator struct {
resultsCh chan<- result
}
-func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error {
select {
- case mr.resultsCh <- result{source, target}:
+ case mr.resultsCh <- result{source, targetStorage, target}:
return nil
case <-ctx.Done():
@@ -178,11 +182,11 @@ func TestReplicate(t *testing.T) {
var replicator defaultReplicator
require.NoError(t, replicator.Replicate(
ctx,
- models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()},
- Node{
- cc: conn,
- Storage: backupStorageName,
- }))
+ models.Repository{RelativePath: testRepo.GetRelativePath()},
+ "default",
+ backupStorageName,
+ conn,
+ ))
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 915d7281a..5952a66ce 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
@@ -7,14 +7,15 @@ import (
"testing"
"time"
+ "github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"google.golang.org/grpc"
)
@@ -44,26 +45,49 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
},
}
+ gz := proto.FileDescriptor("mock/mock.proto")
+ fd, err := protoregistry.ExtractFileDescriptor(gz)
+ if err != nil {
+ panic(err)
+ }
+
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
const (
storagePrimary = "default"
- storageBackup = "backup"
)
- datastore := praefect.NewMemoryDatastore(config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
+ datastore := NewMemoryDatastore(config.Config{
+ StorageNodes: []*models.StorageNode{
+ &models.StorageNode{
+ ID: 1,
+ Storage: "praefect-internal-1",
+ },
+ &models.StorageNode{
+ ID: 2,
+ Storage: "praefect-internal-2",
+ }},
})
- coordinator := praefect.NewCoordinator(logrus.New(), datastore)
- replmgr := praefect.NewReplMgr(
+
+ coordinator := NewCoordinator(logrus.New(), datastore, fd)
+
+ for id, nodeStorage := range datastore.storageNodes.m {
+ backend, cleanup := newMockDownstream(t, tt.callback)
+ defer cleanup() // clean up mock downstream server resources
+
+ coordinator.RegisterNode(nodeStorage.Storage, backend)
+ nodeStorage.Address = backend
+ datastore.storageNodes.m[id] = nodeStorage
+ }
+
+ replmgr := NewReplMgr(
storagePrimary,
logrus.New(),
datastore,
+ datastore,
coordinator,
)
- prf := praefect.NewServer(
+ prf := NewServer(
coordinator,
replmgr,
nil,
@@ -85,13 +109,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
defer cc.Close()
cli := mock.NewSimpleServiceClient(cc)
- for _, replica := range []string{storagePrimary, storageBackup} {
- backend, cleanup := newMockDownstream(t, tt.callback)
- defer cleanup() // clean up mock downstream server resources
-
- coordinator.RegisterNode(replica, backend)
- }
-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()