From 220a0d62a891c390b586d82437a8b5e3cf3a6183 Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 17 Jul 2019 15:35:11 -0700 Subject: Updating data model for praefect --- .gitignore | 1 + _support/praefect-cluster/.gitignore | 6 +- _support/praefect-cluster/config.gitaly.toml | 49 +++ _support/praefect-cluster/config.praefect.toml | 23 +- _support/praefect-cluster/docker-compose.yml | 24 +- _support/praefect-cluster/gitaly-backup-1.toml | 49 --- _support/praefect-cluster/gitaly-backup-2.toml | 49 --- _support/praefect-cluster/gitaly-primary.toml | 49 --- changelogs/unreleased/jc-sql-data-store.yml | 5 + cmd/praefect/main.go | 16 +- internal/helper/storage.go | 11 + internal/praefect/common.go | 8 - internal/praefect/config/config.go | 28 +- internal/praefect/config/config_test.go | 34 +- internal/praefect/config/testdata/config.toml | 24 +- internal/praefect/coordinator.go | 178 +++++----- internal/praefect/coordinator_test.go | 31 +- internal/praefect/datastore.go | 365 +++++++++++---------- internal/praefect/datastore_memory_test.go | 113 +++---- internal/praefect/datastore_test.go | 86 ++--- internal/praefect/mock/mock.pb.go | 30 +- internal/praefect/mock/mock.proto | 7 +- internal/praefect/mocksvc_test.go | 4 +- internal/praefect/models/node.go | 17 + internal/praefect/models/nodes.go | 8 - internal/praefect/models/repository.go | 8 - internal/praefect/protoregistry/protoregistry.go | 37 ++- internal/praefect/protoregistry/targetrepo_test.go | 2 +- internal/praefect/replicator.go | 112 ++++--- internal/praefect/replicator_test.go | 104 +++--- internal/praefect/server_test.go | 51 ++- 31 files changed, 756 insertions(+), 773 deletions(-) create mode 100644 _support/praefect-cluster/config.gitaly.toml delete mode 100644 _support/praefect-cluster/gitaly-backup-1.toml delete mode 100644 _support/praefect-cluster/gitaly-backup-2.toml delete mode 100644 _support/praefect-cluster/gitaly-primary.toml create mode 100644 changelogs/unreleased/jc-sql-data-store.yml create mode 100644 internal/praefect/models/node.go delete mode 100644 internal/praefect/models/nodes.go delete mode 100644 internal/praefect/models/repository.go diff --git a/.gitignore b/.gitignore index b7fbb0b4c..9fb2c7e44 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ cmd/gitaly-remote/gitaly-remote git-env /gitaly-debug /praefect +/praefect-migrate gitaly.pid /vendor/github.com/libgit2/git2go/vendor /vendor diff --git a/_support/praefect-cluster/.gitignore b/_support/praefect-cluster/.gitignore index 06b873206..bd035c2b7 100644 --- a/_support/praefect-cluster/.gitignore +++ b/_support/praefect-cluster/.gitignore @@ -1,3 +1,3 @@ -/gitaly-backup-1 -/gitaly-backup-2 -/gitaly-primary +/gitaly-1 +/gitaly-2 +/gitaly-3 diff --git a/_support/praefect-cluster/config.gitaly.toml b/_support/praefect-cluster/config.gitaly.toml new file mode 100644 index 000000000..2379b6951 --- /dev/null +++ b/_support/praefect-cluster/config.gitaly.toml @@ -0,0 +1,49 @@ +# Example Gitaly configuration file + +# The directory where Gitaly's executables are stored +bin_dir = "/usr/local/bin" + +# listen on a TCP socket. This is insecure (no authentication) +listen_addr = "0.0.0.0:9999" + +# # Optional: export metrics via Prometheus +# prometheus_listen_addr = "localhost:9236" +# + +# # Git executable settings +# [git] +# bin_path = "/usr/bin/git" + +[[storage]] +name = "default" +path = "/home/git/repositories" + +# # You can optionally configure more storages for this Gitaly instance to serve up +# +# [[storage]] +# name = "other_storage" +# path = "/mnt/other_storage/repositories" +# + +# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout +# [logging] +# format = "json" +# # Additionally exceptions can be reported to Sentry +# sentry_dsn = "https://:@sentry.io/" + +# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls +# [prometheus] +# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] + +[gitaly-ruby] +# The directory where gitaly-ruby is installed +dir = "/srv/gitaly-ruby" + +[gitlab-shell] +# The directory where gitlab-shell is installed +dir = "/srv/gitlab-shell" + +# # You can adjust the concurrency of each RPC endpoint +# [[concurrency]] +# rpc = "/gitaly.RepositoryService/GarbageCollect" +# max_per_repo = 1 diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml index e0f163178..2a4297248 100644 --- a/_support/praefect-cluster/config.praefect.toml +++ b/_support/praefect-cluster/config.praefect.toml @@ -24,17 +24,22 @@ whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90e # as shard. listen_addr should be unique for all nodes. # Requires the protocol to be defined, e.g. tcp://host.tld:1234 -[primary_server] - name = "default" +[[storage_node]] # listen_addr = "tcp://gitaly-primary:9999" - listen_addr = "tcp://127.0.0.1:9999" + storage = "praefect-internal-1" + address = "tcp://127.0.0.1:9999" -[[secondary_server]] - name = "backup1" +[[storage_node]] # listen_addr = "tcp://gitaly-backup-1:9999" - listen_addr = "tcp://127.0.0.1:9998" + storage = "praefect-internal-2" + address = "tcp://127.0.0.1:9998" -[[secondary_server]] - name = "backup2" +[[storage_node]] # listen_addr = "tcp://gitaly-backup-2:9999" - listen_addr = "tcp://127.0.0.1:9997" \ No newline at end of file + storage = "praefect-internal-3" + address = "tcp://127.0.0.1:9997" + +[postgres] + user = "johncai" + address = "127.0.0.1:5432" + database = "praefect_test" \ No newline at end of file diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml index 6eb81be47..09745ea41 100644 --- a/_support/praefect-cluster/docker-compose.yml +++ b/_support/praefect-cluster/docker-compose.yml @@ -6,15 +6,15 @@ services: # dockerfile: Dockerfile.praefect # image: praefect:latest # depends_on: -# - gitaly-primary -# - gitaly-backup-1 -# - gitaly-backup-2 +# - gitaly-1 +# - gitaly-2 +# - gitaly-3 # command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"] # ports: # - "2305:2305" # volumes: # - ./config.praefect.toml:/etc/gitaly/config.praefect.toml - gitaly-primary: + gitaly-1: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -24,9 +24,9 @@ services: - "9999:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-primary/data:/home/git/repositories - - ./gitaly-primary.toml:/etc/config/config.toml - gitaly-backup-1: + - ./gitaly-1/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml + gitaly-2: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -36,9 +36,9 @@ services: - "9998:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-1/data:/home/git/repositories - - ./gitaly-backup-1.toml:/etc/config/config.toml - gitaly-backup-2: + - ./gitaly-2/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml + gitaly-3: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest environment: - GITALY_TESTING_NO_GIT_HOOKS=1 @@ -48,5 +48,5 @@ services: - "9997:9999" command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-2/data:/home/git/repositories - - ./gitaly-backup-2.toml:/etc/config/config.toml \ No newline at end of file + - ./gitaly-3/data:/home/git/repositories + - ./config.gitaly.toml:/etc/config/config.toml \ No newline at end of file diff --git a/_support/praefect-cluster/gitaly-backup-1.toml b/_support/praefect-cluster/gitaly-backup-1.toml deleted file mode 100644 index 89d1884e3..000000000 --- a/_support/praefect-cluster/gitaly-backup-1.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "backup1" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/_support/praefect-cluster/gitaly-backup-2.toml b/_support/praefect-cluster/gitaly-backup-2.toml deleted file mode 100644 index 1b5ce8d20..000000000 --- a/_support/praefect-cluster/gitaly-backup-2.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "backup2" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/_support/praefect-cluster/gitaly-primary.toml b/_support/praefect-cluster/gitaly-primary.toml deleted file mode 100644 index 2379b6951..000000000 --- a/_support/praefect-cluster/gitaly-primary.toml +++ /dev/null @@ -1,49 +0,0 @@ -# Example Gitaly configuration file - -# The directory where Gitaly's executables are stored -bin_dir = "/usr/local/bin" - -# listen on a TCP socket. This is insecure (no authentication) -listen_addr = "0.0.0.0:9999" - -# # Optional: export metrics via Prometheus -# prometheus_listen_addr = "localhost:9236" -# - -# # Git executable settings -# [git] -# bin_path = "/usr/bin/git" - -[[storage]] -name = "default" -path = "/home/git/repositories" - -# # You can optionally configure more storages for this Gitaly instance to serve up -# -# [[storage]] -# name = "other_storage" -# path = "/mnt/other_storage/repositories" -# - -# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout -# [logging] -# format = "json" -# # Additionally exceptions can be reported to Sentry -# sentry_dsn = "https://:@sentry.io/" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -# [prometheus] -# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/srv/gitaly-ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/srv/gitlab-shell" - -# # You can adjust the concurrency of each RPC endpoint -# [[concurrency]] -# rpc = "/gitaly.RepositoryService/GarbageCollect" -# max_per_repo = 1 diff --git a/changelogs/unreleased/jc-sql-data-store.yml b/changelogs/unreleased/jc-sql-data-store.yml new file mode 100644 index 000000000..e9ccb210d --- /dev/null +++ b/changelogs/unreleased/jc-sql-data-store.yml @@ -0,0 +1,5 @@ +--- +title: SQL datastore for praefect +merge_request: 1370 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index acf53d2fa..8ce68530d 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "gitlab.com/gitlab-org/gitaly/internal/version" "gitlab.com/gitlab-org/labkit/tracing" ) @@ -95,10 +96,9 @@ func run(listeners []net.Listener, conf config.Config) error { var ( // top level server dependencies datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, datastore) - repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) + coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...) + repl = praefect.NewReplMgr("default", logger, datastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) srv = praefect.NewServer(coordinator, repl, nil, logger) - // signal related signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT} termCh = make(chan os.Signal, len(signals)) @@ -114,14 +114,12 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer) - - for _, gitaly := range allBackendServers { - if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil { - return fmt.Errorf("failed to register %s: %s", gitaly.Name, err) + for _, node := range conf.StorageNodes { + if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil { + return fmt.Errorf("failed to register %s: %s", node.Address, err) } - logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node") + logger.WithField("node_address", node.Address).Info("registered gitaly node") } go func() { serverErrors <- repl.ProcessBacklog(ctx) }() diff --git a/internal/helper/storage.go b/internal/helper/storage.go index 4e535a5d6..f3f0d0ba0 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -34,6 +34,17 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly return } +// IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata +func IncomingToOutgoing(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return ctx + } + + return metadata.NewOutgoingContext(ctx, md) + +} + // InjectGitalyServers injects gitaly-servers metadata into an outgoing context func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) { diff --git a/internal/praefect/common.go b/internal/praefect/common.go index 2df2a4823..a09a292ad 100644 --- a/internal/praefect/common.go +++ b/internal/praefect/common.go @@ -1,13 +1,5 @@ package praefect -import "google.golang.org/grpc" - -// Node is a wrapper around the grpc client connection for a backend Gitaly node -type Node struct { - Storage string - cc *grpc.ClientConn -} - // logging keys to use with logrus WithField const ( logKeyProjectPath = "ProjectPath" diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 6a2a5b5d5..eb0fad56b 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -15,8 +15,7 @@ type Config struct { ListenAddr string `toml:"listen_addr"` SocketPath string `toml:"socket_path"` - PrimaryServer *models.GitalyServer `toml:"primary_server"` - SecondaryServers []*models.GitalyServer `toml:"secondary_server"` + StorageNodes []*models.StorageNode `toml:"storage_node"` // Whitelist is a list of relative project paths (paths comprised of project // hashes) that are permitted to use high availability features @@ -26,13 +25,6 @@ type Config struct { PrometheusListenAddr string `toml:"prometheus_listen_addr"` } -// GitalyServer allows configuring the servers that RPCs are proxied to -type GitalyServer struct { - Name string `toml:"name"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - Token string `toml:"token"` -} - // FromFile loads the config for the passed file path func FromFile(filePath string) (Config, error) { config := &Config{} @@ -50,32 +42,30 @@ var ( errNoListener = errors.New("no listen address or socket path configured") errNoGitalyServers = errors.New("no primary gitaly backends configured") errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique") - errGitalyWithoutName = errors.New("all gitaly servers must have a name") + errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") ) -var emptyServer = &models.GitalyServer{} - // Validate establishes if the config is valid func (c Config) Validate() error { if c.ListenAddr == "" && c.SocketPath == "" { return errNoListener } - if c.PrimaryServer == nil || c.PrimaryServer == emptyServer { + if len(c.StorageNodes) == 0 { return errNoGitalyServers } - listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1) - for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) { - if gitaly.Name == "" { - return errGitalyWithoutName + listenAddrs := make(map[string]bool, len(c.StorageNodes)) + for _, node := range c.StorageNodes { + if node.Address == "" { + return errGitalyWithoutAddr } - if _, found := listenAddrs[gitaly.ListenAddr]; found { + if _, found := listenAddrs[node.Address]; found { return errDuplicateGitalyAddr } - listenAddrs[gitaly.ListenAddr] = true + listenAddrs[node.Address] = true } return nil diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index eace5eb2f..b89bdd648 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -9,10 +9,10 @@ import ( ) func TestConfigValidation(t *testing.T) { - primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"} - secondarySrvs := []*models.GitalyServer{ - {"test1", "localhost:23457", "secret-token"}, - {"test2", "localhost:23458", "secret-token"}, + nodes := []*models.StorageNode{ + {ID: 1, Address: "localhost:23456", Token: "secret-token"}, + {ID: 2, Address: "localhost:23457", Token: "secret-token"}, + {ID: 3, Address: "localhost:23458", Token: "secret-token"}, } testCases := []struct { @@ -22,12 +22,12 @@ func TestConfigValidation(t *testing.T) { }{ { desc: "No ListenAddr or SocketPath", - config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "", StorageNodes: nodes}, err: errNoListener, }, { desc: "Only a SocketPath", - config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes}, err: nil, }, { @@ -37,12 +37,12 @@ func TestConfigValidation(t *testing.T) { }, { desc: "duplicate address", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}}, + config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address})}, err: errDuplicateGitalyAddr, }, { desc: "Valid config", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs}, + config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes}, err: nil, }, } @@ -63,18 +63,18 @@ func TestConfigParsing(t *testing.T) { { filePath: "testdata/config.toml", expected: Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*models.GitalyServer{ + StorageNodes: []*models.StorageNode{ + { + Address: "tcp://gitaly-internal-1.example.com", + Storage: "praefect-internal-1", + }, { - Name: "default", - ListenAddr: "tcp://gitaly-backup1.example.com", + Address: "tcp://gitaly-internal-2.example.com", + Storage: "praefect-internal-2", }, { - Name: "backup", - ListenAddr: "tcp://gitaly-backup2.example.com", + Address: "tcp://gitaly-internal-3.example.com", + Storage: "praefect-internal-3", }, }, Whitelist: []string{"abcd1234", "edfg5678"}, diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 81701a359..defebcca9 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -3,20 +3,20 @@ socket_path = "" whitelist = ["abcd1234", "edfg5678"] prometheus_listen_addr = "" -[primary_server] - name = "default" - listen_addr = "tcp://gitaly-primary.example.com" - -[[secondary_server]] - name = "default" - listen_addr = "tcp://gitaly-backup1.example.com" - -[[secondary_server]] - name = "backup" - listen_addr = "tcp://gitaly-backup2.example.com" - [logging] format = "" sentry_dsn = "" ruby_sentry_dsn = "" level = "" + +[[storage_node]] + address = "tcp://gitaly-internal-1.example.com" + storage = "praefect-internal-1" + +[[storage_node]] + address = "tcp://gitaly-internal-2.example.com" + storage = "praefect-internal-2" + +[[storage_node]] + address = "tcp://gitaly-internal-3.example.com" + storage = "praefect-internal-3" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index c238604f3..f00a469b6 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -2,11 +2,14 @@ package praefect import ( "context" + "errors" "fmt" + "math/rand" "os" "os/signal" "sync" "syscall" + "time" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" @@ -19,8 +22,6 @@ import ( "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // Coordinator takes care of directing client requests to the appropriate @@ -31,14 +32,14 @@ type Coordinator struct { failoverMutex sync.RWMutex connMutex sync.RWMutex - datastore Datastore + datastore ReplicasDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -55,19 +56,6 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) return c.registry.RegisterFiles(protos...) } -// GetStorageNode returns the registered node for the given storage location -func (c *Coordinator) GetStorageNode(storage string) (Node, error) { - cc, ok := c.getConn(storage) - if !ok { - return Node{}, fmt.Errorf("no node registered for storage location %q", storage) - } - - return Node{ - Storage: storage, - cc: cc, - }, nil -} - // streamDirector determines which downstream servers receive requests func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, error) { // For phase 1, we need to route messages based on the storage location @@ -77,34 +65,93 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, c.failoverMutex.RLock() defer c.failoverMutex.RUnlock() - serverConfig, err := c.datastore.GetDefaultPrimary() + frame, err := peeker.Peek() if err != nil { - err := status.Error( - codes.FailedPrecondition, - "no downstream node registered", - ) return nil, nil, err } - // We only need the primary node, as there's only one primary storage - // location per praefect at this time - cc, ok := c.getConn(serverConfig.Name) - if !ok { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name) + mi, err := c.registry.LookupMethod(fullMethodName) + if err != nil { + return nil, nil, err } - ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token) + var primary *models.StorageNode + + if mi.Scope == protoregistry.ScopeRepository { + m, err := mi.UnmarshalRequestProto(frame) + if err != nil { + return nil, nil, err + } + + targetRepo, err := mi.TargetRepo(m) + if err != nil { + return nil, nil, err + } + + primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) + + if err != nil { + if err != ErrPrimaryNotSet { + return nil, nil, err + } + // if there are no primaries for this repository, pick one + nodes, err := c.datastore.GetStorageNodes() + if err != nil { + return nil, nil, err + } + + if len(nodes) == 0 { + return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + + } + newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))] + //newPrimary := nodes[0] + + // set the primary + if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { + return nil, nil, err + } + + primary = &newPrimary + } + + targetRepo.StorageName = primary.Storage + + b, err := proxy.Codec().Marshal(m) + if err != nil { + return nil, nil, err + } + if err = peeker.Modify(b); err != nil { + return nil, nil, err + } + + } else { + //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to + // proxy requests that are not repository scoped + node, err := c.datastore.GetStorageNodes() + if err != nil { + return nil, nil, err + } + if len(node) == 0 { + return nil, nil, errors.New("no node storages found") + } + primary = &node[0] + } + + // We only need the primary node, as there's only one primary storage + // location per praefect at this time + cc, err := c.GetConnection(primary.Storage) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) } - return ctx, cc, nil + return helper.IncomingToOutgoing(ctx), cc, nil } // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { - conn, err := client.Dial(listenAddr, +func (c *Coordinator) RegisterNode(storage, address string) error { + conn, err := client.Dial(address, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)), @@ -114,23 +161,28 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { return err } - c.setConn(storageName, conn) + c.setConn(storage, conn) return nil } -func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(storage string, conn *grpc.ClientConn) { c.connMutex.Lock() - c.nodes[storageName] = conn + c.nodes[storage] = conn c.connMutex.Unlock() } -func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) { +// GetConnection gets the grpc client connection based on an address +func (c *Coordinator) GetConnection(storage string) (*grpc.ClientConn, error) { c.connMutex.RLock() - cc, ok := c.nodes[storageName] + cc, ok := c.nodes[storage] c.connMutex.RUnlock() + if !ok { + return nil, errors.New("client connection not found") + } + + return cc, nil - return cc, ok } // FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary @@ -146,55 +198,7 @@ func (c *Coordinator) handleSignalAndRotate() { <-failoverChan c.failoverMutex.Lock() - primary, err := c.datastore.GetDefaultPrimary() - if err != nil { - c.log.Fatalf("error when getting default primary: %v", err) - } - - if err := c.rotateSecondaryToPrimary(primary); err != nil { - c.log.WithError(err).Error("rotating secondary") - } + c.log.Info("failover happens") c.failoverMutex.Unlock() } } - -func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error { - repositories, err := c.datastore.GetRepositoriesForPrimary(primary) - if err != nil { - return err - } - - for _, repoPath := range repositories { - secondaries, err := c.datastore.GetShardSecondaries(models.Repository{ - RelativePath: repoPath, - }) - if err != nil { - return fmt.Errorf("getting secondaries: %v", err) - } - - newPrimary := secondaries[0] - secondaries = append(secondaries[1:], primary) - - if err = c.datastore.SetShardPrimary(models.Repository{ - RelativePath: repoPath, - }, newPrimary); err != nil { - return fmt.Errorf("setting primary: %v", err) - } - - if err = c.datastore.SetShardSecondaries(models.Repository{ - RelativePath: repoPath, - }, secondaries); err != nil { - return fmt.Errorf("setting secondaries: %v", err) - } - } - - // set the new default primary - primary, err = c.datastore.GetShardPrimary(models.Repository{ - RelativePath: repositories[0], - }) - if err != nil { - return fmt.Errorf("getting shard primary: %v", err) - } - - return c.datastore.SetDefaultPrimary(primary) -} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 50045f8a0..0275c6048 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,9 +5,6 @@ import ( "testing" "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) var testLogger = logrus.New() @@ -17,31 +14,5 @@ func init() { } func TestSecondaryRotation(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{Name: "primary"}, - SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}}, - Whitelist: []string{"/repoA", "/repoB"}, - } - d := NewMemoryDatastore(cfg) - c := NewCoordinator(testLogger, d) - - primary, err := d.GetDefaultPrimary() - require.NoError(t, err) - - require.NoError(t, c.rotateSecondaryToPrimary(primary)) - - primary, err = d.GetDefaultPrimary() - require.NoError(t, err) - require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary") - - repositories, err := d.GetRepositoriesForPrimary(primary) - require.NoError(t, err) - - for _, repository := range repositories { - shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository}) - require.NoError(t, err) - - require.Len(t, shardSecondaries, 2) - require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0]) - } + t.Skip("secondary rotation will change with the new data model") } diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 5678c6a24..85fa22a23 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -34,7 +34,7 @@ const ( // JobStateComplete indicates the job is now complete JobStateComplete // JobStateCancelled indicates the job was cancelled. This can occur if the - // job is no longer relevant (e.g. a node is moved out of a shard) + // job is no longer relevant (e.g. a node is moved out of a repository) JobStateCancelled ) @@ -42,10 +42,11 @@ const ( // meant for updating the repository so that it is synced with the primary // copy. Scheduled indicates when a replication job should be performed. type ReplJob struct { - ID uint64 // autoincrement ID - Target string // which storage location to replicate to? - Source models.Repository // source for replication - State JobState + ID uint64 // autoincrement ID + TargetNodeID int // which node to replicate to? + SourceStorage string + Source models.Repository // source for replication + State JobState } // replJobs provides sort manipulation behavior @@ -64,32 +65,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I type Datastore interface { ReplJobsDatastore ReplicasDatastore - TemporaryDatastore -} - -// TemporaryDatastore contains methods that will go away once we move to a SQL datastore -type TemporaryDatastore interface { - GetDefaultPrimary() (models.GitalyServer, error) - SetDefaultPrimary(primary models.GitalyServer) error } // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { - // GetSecondaries will retrieve all secondary replica storage locations for - // a primary replica - GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error) + GetReplicas(relativePath string) ([]models.StorageNode, error) + + GetStorageNode(nodeID int) (models.StorageNode, error) + + GetStorageNodes() ([]models.StorageNode, error) + + GetPrimary(relativePath string) (*models.StorageNode, error) - GetShardPrimary(repo models.Repository) (models.GitalyServer, error) + SetPrimary(relativePath string, storageNodeID int) error - // SetSecondaries will set the secondary storage locations for a repository - // in a primary replica. - SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error + AddReplica(relativePath string, storageNodeID int) error - SetShardPrimary(repo models.Repository, primary models.GitalyServer) error + RemoveReplica(relativePath string, storageNodeID int) error - // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary - GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) + GetRepository(relativePath string) (*models.Repository, error) } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -98,58 +93,53 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, node string, count int) ([]ReplJob, error) + GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error) - // CreateSecondaryJobs will create replication jobs for each secondary + // CreateReplicaJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) + CreateReplicaReplJobs(relativePath string) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error } -// shard is a set of primary and secondary storage replicas for a project -type shard struct { - primary models.GitalyServer - secondaries []models.GitalyServer -} - type jobRecord struct { - relativePath string // project's relative path - targetNode string - state JobState + sourceStorage string + relativePath string // project's relative path + targetNodeID int + state JobState } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation type MemoryDatastore struct { - replicas *struct { - sync.RWMutex - m map[string]shard // keyed by project's relative path - } - jobs *struct { sync.RWMutex next uint64 records map[uint64]jobRecord // all jobs indexed by ID } - primary *struct { + storageNodes *struct { sync.RWMutex - server models.GitalyServer + m map[int]models.StorageNode + } + + repositories *struct { + sync.RWMutex + m map[string]models.Repository } } // NewMemoryDatastore returns an initialized in-memory datastore func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { m := &MemoryDatastore{ - replicas: &struct { + storageNodes: &struct { sync.RWMutex - m map[string]shard + m map[int]models.StorageNode }{ - m: map[string]shard{}, + m: map[int]models.StorageNode{}, }, jobs: &struct { sync.RWMutex @@ -159,114 +149,190 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, - primary: &struct { + repositories: &struct { sync.RWMutex - server models.GitalyServer + m map[string]models.Repository }{ - server: models.GitalyServer{ - Name: cfg.PrimaryServer.Name, - ListenAddr: cfg.PrimaryServer.ListenAddr, - Token: cfg.PrimaryServer.Token, - }, + m: map[string]models.Repository{}, }, } - secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers)) - for i, server := range cfg.SecondaryServers { - secondaryServers[i] = *server + for i, storageNode := range cfg.StorageNodes { + storageNode.ID = i + m.storageNodes.m[i] = *storageNode } - for _, repo := range cfg.Whitelist { - // store the configuration file specified shard - m.replicas.m[repo] = shard{ - primary: *cfg.PrimaryServer, - secondaries: secondaryServers, + for _, repoPath := range cfg.Whitelist { + repo := models.Repository{ + RelativePath: repoPath, } - - // initialize replication job queue to replicate all whitelisted repos - // to every secondary server - for _, secondary := range cfg.SecondaryServers { - m.jobs.next++ - m.jobs.records[m.jobs.next] = jobRecord{ - state: JobStateReady, - targetNode: secondary.Name, - relativePath: repo, + for storageID, storageNode := range cfg.StorageNodes { + + // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node + // to be the primary + if repo.Primary == (models.StorageNode{}) { + repo.Primary = *storageNode + } else { + repo.Replicas = append(repo.Replicas, *storageNode) + // initialize replication job queue to replicate all whitelisted repos + // to every replica + m.jobs.next++ + m.jobs.records[m.jobs.next] = jobRecord{ + state: JobStateReady, + targetNodeID: storageID, + sourceStorage: repo.Primary.Storage, + relativePath: repoPath, + } } } + m.repositories.m[repoPath] = repo } return m } -// GetShardSecondaries will return the set of secondary storage locations for a -// given repository if they exist -func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) { - shard, _ := md.getShard(primary.RelativePath) +// GetReplicas gets the secondaries for a repository based on the relative path +func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) { + md.repositories.RLock() + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + defer md.repositories.RUnlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("repository not found") + } + + return repository.Replicas, nil +} + +// GetStorageNode gets all storage nodes +func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() - return shard.secondaries, nil + node, ok := md.storageNodes.m[nodeID] + if !ok { + return models.StorageNode{}, errors.New("node not found") + } + + return node, nil } -// SetShardSecondaries will replace the set of replicas for a repository -func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// GetStorageNodes gets all storage nodes +func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() - shard := md.replicas.m[repo.RelativePath] - shard.secondaries = secondaries - md.replicas.m[repo.RelativePath] = shard + var storageNodes []models.StorageNode + for _, storageNode := range md.storageNodes.m { + storageNodes = append(storageNodes, storageNode) + } - return nil + return storageNodes, nil } -// SetShardPrimary sets the primary for a repository -func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// GetPrimary gets the primary storage node for a repository of a repository relative path +func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + md.repositories.RLock() + defer md.repositories.RUnlock() - shard := md.replicas.m[repo.RelativePath] - shard.primary = primary - md.replicas.m[repo.RelativePath] = shard + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, ErrPrimaryNotSet + } + storageNode, ok := md.storageNodes.m[repository.Primary.ID] + if !ok { + return nil, errors.New("node storage not found") + } + return &storageNode, nil + +} + +// SetPrimary sets the primary storagee node for a repository of a repository relative path +func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + repository = models.Repository{RelativePath: relativePath} + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } + + repository.Primary = storageNode + + md.repositories.m[relativePath] = repository return nil } -// GetShardPrimary gets the primary for a repository -func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// AddReplica adds a secondary to a repository of a repository relative path +func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("repository not found") + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } - shard := md.replicas.m[repo.RelativePath] - return shard.primary, nil + repository.Replicas = append(repository.Replicas, storageNode) + + md.repositories.m[relativePath] = repository + return nil } -// GetRepositoriesForPrimary gets all repositories -func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// RemoveReplica removes a secondary from a repository of a repository relative path +func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - repositories := make([]string, 0, len(md.replicas.m)) + repository, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("repository not found") + } - for repository := range md.replicas.m { - repositories = append(repositories, repository) + var secondaries []models.StorageNode + for _, secondary := range repository.Replicas { + if secondary.ID != storageNodeID { + secondaries = append(secondaries, secondary) + } } - return repositories, nil + repository.Replicas = secondaries + md.repositories.m[relativePath] = repository + return nil } -func (md *MemoryDatastore) getShard(project string) (shard, bool) { - md.replicas.RLock() - replicas, ok := md.replicas.m[project] - md.replicas.RUnlock() +// GetRepository gets the repository for a repository relative path +func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) { + md.repositories.Lock() + defer md.repositories.Unlock() - return replicas, ok + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("repository not found") + } + + return &repository, nil } -// ErrSecondariesMissing indicates the repository does not have any backup +// ErrReplicasMissing indicates the repository does not have any backup // replicas -var ErrSecondariesMissing = errors.New("repository missing secondary replicas") +var ErrReplicasMissing = errors.New("repository missing secondary replicas") // GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) { +func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) { md.jobs.RLock() defer md.jobs.RUnlock() @@ -274,7 +340,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.targetNode == storage { + if record.state&state != 0 && record.targetNodeID == targetNodeID { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -293,60 +359,52 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ } // replJobFromRecord constructs a replication job from a record and by cross -// referencing the current shard for the project being replicated +// referencing the current repository for the project being replicated func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { - shard, ok := md.getShard(record.relativePath) - if !ok { - return ReplJob{}, fmt.Errorf( - "unable to find shard for project at relative path %q", - record.relativePath, - ) - } - return ReplJob{ ID: jobID, Source: models.Repository{ RelativePath: record.relativePath, - Storage: shard.primary.Name, }, - State: record.state, - Target: record.targetNode, + SourceStorage: record.sourceStorage, + State: record.state, + TargetNodeID: record.targetNodeID, }, nil } -// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because +// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because // it fails preconditions for being replicatable -var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication") +var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication") -// CreateSecondaryReplJobs creates a replication job for each secondary that +// CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() - emptyRepo := models.Repository{} - if source == emptyRepo { + if relativePath == "" { return nil, errors.New("invalid source repository") } - shard, ok := md.getShard(source.RelativePath) - if !ok { + repository, err := md.GetRepository(relativePath) + if err != nil { return nil, fmt.Errorf( - "unable to find shard for project at relative path %q", - source.RelativePath, + "unable to find repository for project at relative path %q", + relativePath, ) } var jobIDs []uint64 - for _, secondary := range shard.secondaries { + for _, secondary := range repository.Replicas { nextID := uint64(len(md.jobs.records) + 1) md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - targetNode: secondary.Name, - state: JobStatePending, - relativePath: source.RelativePath, + targetNodeID: secondary.ID, + state: JobStatePending, + relativePath: relativePath, + sourceStorage: repository.Primary.Storage, } jobIDs = append(jobIDs, nextID) @@ -375,36 +433,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } - -// SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error { - md.primary.Lock() - defer md.primary.Unlock() - - md.primary.server = primary - - return nil -} - -// GetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) { - md.primary.RLock() - defer md.primary.RUnlock() - - primary := md.primary.server - if primary == (models.GitalyServer{}) { - return primary, ErrPrimaryNotSet - } - - return primary, nil -} - -// SetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error { - md.primary.RLock() - defer md.primary.RUnlock() - - md.primary.server = primary - - return nil -} diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index 6099a8328..a618d9466 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -1,93 +1,94 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) // TestMemoryDatastoreWhitelist verifies that the in-memory datastore will -// populate itself with the correct replication jobs and shards when initialized +// populate itself with the correct replication jobs and repositories when initialized // with a configuration file specifying the shard and whitelisted repositories. func TestMemoryDatastoreWhitelist(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup-1", - }, - { - Name: "backup-2", - }, - }, - Whitelist: []string{"abcd1234", "5678efgh"}, - } - - mds := praefect.NewMemoryDatastore(cfg) - repo1 := models.Repository{ - RelativePath: cfg.Whitelist[0], - Storage: cfg.PrimaryServer.Name, + RelativePath: "abcd1234", } - repo2 := models.Repository{ - RelativePath: cfg.Whitelist[1], - Storage: cfg.PrimaryServer.Name, + RelativePath: "5678efgh", } + mds := NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{ + &models.StorageNode{ + ID: 0, + Address: "tcp://default", + Storage: "praefect-internal-1", + }, + &models.StorageNode{ + ID: 1, + Address: "tcp://backup-2", + Storage: "praefect-internal-2", + }, &models.StorageNode{ + ID: 2, + Address: "tcp://backup-2", + Storage: "praefect-internal-3", + }}, + Whitelist: []string{repo1.RelativePath, repo2.RelativePath}, + }) - expectSecondaries := []models.GitalyServer{ - models.GitalyServer{Name: cfg.SecondaryServers[0].Name}, - models.GitalyServer{Name: cfg.SecondaryServers[1].Name}, + expectReplicas := []models.StorageNode{ + mds.storageNodes.m[1], + mds.storageNodes.m[2], } for _, repo := range []models.Repository{repo1, repo2} { - actualSecondaries, err := mds.GetShardSecondaries(repo) + actualReplicas, err := mds.GetReplicas(repo.RelativePath) require.NoError(t, err) - require.ElementsMatch(t, expectSecondaries, actualSecondaries) + require.ElementsMatch(t, expectReplicas, actualReplicas) } - backup1 := cfg.SecondaryServers[0] - backup2 := cfg.SecondaryServers[1] + backup1 := mds.storageNodes.m[1] + backup2 := mds.storageNodes.m[2] - backup1ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ - ID: 1, - Target: backup1.Name, - Source: repo1, - State: praefect.JobStateReady, + backup1ExpectedJobs := []ReplJob{ + ReplJob{ + ID: 1, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, - praefect.ReplJob{ - ID: 3, - Target: backup1.Name, - Source: repo2, - State: praefect.JobStateReady, + ReplJob{ + ID: 3, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, } - backup2ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ - ID: 2, - Target: backup2.Name, - Source: repo1, - State: praefect.JobStateReady, + backup2ExpectedJobs := []ReplJob{ + ReplJob{ + ID: 2, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, - praefect.ReplJob{ - ID: 4, - Target: backup2.Name, - Source: repo2, - State: praefect.JobStateReady, + ReplJob{ + ID: 4, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStateReady, }, } - backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10) + backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10) require.NoError(t, err) require.Equal(t, backup1ExpectedJobs, backup1ActualJobs) - backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10) + backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10) require.NoError(t, err) require.Equal(t, backup2ActualJobs, backup2ExpectedJobs) diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 417a04be2..654d5ea7e 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -1,95 +1,104 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) -const ( - stor1 = "default" // usually the primary storage location - stor2 = "backup-1" // usually the seoncary storage location +var ( + stor1 = models.StorageNode{ + ID: 0, + Address: "tcp://address-1", + Storage: "praefect-storage-1", + } + stor2 = models.StorageNode{ + ID: 1, + Address: "tcp://address-2", + Storage: "praefect-storage-2", + } proj1 = "abcd1234" // imagine this is a legit project hash ) var ( - repo1Primary = models.Repository{ + repo1Repository = models.Repository{ RelativePath: proj1, - Storage: stor1, } ) var operations = []struct { desc string - opFn func(*testing.T, praefect.Datastore) + opFn func(*testing.T, Datastore) }{ { desc: "query an empty datastore", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, }, { desc: "insert first replication job before secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - _, err := ds.CreateSecondaryReplJobs(repo1Primary) - require.Error(t, err, praefect.ErrInvalidReplTarget) + opFn: func(t *testing.T, ds Datastore) { + _, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) + require.Error(t, err, ErrInvalidReplTarget) }, }, { - desc: "set the primary for the shard", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1}) + desc: "set the primary for the repository", + opFn: func(t *testing.T, ds Datastore) { + err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID) require.NoError(t, err) }, }, { - desc: "associate the replication job target with a primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}}) + desc: "add a secondary replica for the repository", + opFn: func(t *testing.T, ds Datastore) { + err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID) require.NoError(t, err) }, }, { desc: "insert first replication job after secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - ids, err := ds.CreateSecondaryReplJobs(repo1Primary) + opFn: func(t *testing.T, ds Datastore) { + ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) require.NoError(t, err) require.Equal(t, []uint64{1}, ids) }, }, { desc: "fetch inserted replication jobs after primary mapped", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10) require.NoError(t, err) require.Len(t, jobs, 1) - expectedJob := praefect.ReplJob{ - ID: 1, - Source: repo1Primary, - Target: stor2, - State: praefect.JobStatePending, + expectedJob := ReplJob{ + ID: 1, + Source: models.Repository{ + RelativePath: repo1Repository.RelativePath, + }, + SourceStorage: "praefect-storage-1", + TargetNodeID: stor2.ID, + State: JobStatePending, } require.Equal(t, expectedJob, jobs[0]) }, }, { desc: "mark replication job done", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.UpdateReplJob(1, praefect.JobStateComplete) + opFn: func(t *testing.T, ds Datastore) { + err := ds.UpdateReplJob(1, JobStateComplete) require.NoError(t, err) }, }, { desc: "try fetching completed replication job", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, @@ -97,14 +106,11 @@ var operations = []struct { } // TODO: add SQL datastore flavor -var flavors = map[string]func() praefect.Datastore{ - "in-memory-datastore": func() praefect.Datastore { - return praefect.NewMemoryDatastore( - config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - }) +var flavors = map[string]func() Datastore{ + "in-memory-datastore": func() Datastore { + return NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{&stor1, &stor2}, + }) }, } diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go index b8a8afb01..7324cbcaf 100644 --- a/internal/praefect/mock/mock.pb.go +++ b/internal/praefect/mock/mock.pb.go @@ -9,7 +9,10 @@ import ( math "math" proto "github.com/golang/protobuf/proto" + _ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" ) // Reference imports to suppress errors if they are not otherwise used. @@ -109,16 +112,17 @@ func init() { func init() { proto.RegisterFile("mock/mock.proto", fileDescriptor_5ed43251284e3118) } var fileDescriptor_5ed43251284e3118 = []byte{ - // 139 bytes of a gzipped FileDescriptorProto + // 157 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xcd, 0x4f, 0xce, - 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x92, 0x2a, 0x17, - 0x6f, 0x70, 0x66, 0x6e, 0x41, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x08, - 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x84, 0xa3, - 0xa4, 0xc6, 0xc5, 0x07, 0x53, 0x56, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0x50, 0xc7, 0x84, 0xa4, - 0xce, 0x28, 0x00, 0x66, 0x5c, 0x70, 0x6a, 0x51, 0x59, 0x66, 0x72, 0xaa, 0x90, 0x3d, 0x97, 0x00, - 0x44, 0x20, 0x34, 0x2f, 0xb1, 0xa8, 0x12, 0x4c, 0x08, 0x09, 0xeb, 0x81, 0x9d, 0x81, 0x62, 0xaf, - 0x94, 0x08, 0xaa, 0x20, 0xc4, 0x16, 0x25, 0x86, 0x24, 0x36, 0xb0, 0x6b, 0x8d, 0x01, 0x01, 0x00, - 0x00, 0xff, 0xff, 0xb7, 0xeb, 0x46, 0xfb, 0xc0, 0x00, 0x00, 0x00, + 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x14, 0x4f, 0x71, + 0x46, 0x62, 0x51, 0x6a, 0x0a, 0x44, 0x4c, 0x49, 0x95, 0x8b, 0x37, 0x38, 0x33, 0xb7, 0x20, 0x27, + 0x35, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, + 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x35, 0x08, 0xc2, 0x51, 0x52, 0xe3, 0xe2, 0x83, 0x29, 0x2b, + 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x45, 0xa8, 0x63, 0x42, 0x52, 0x67, 0x14, 0x01, 0x33, 0x2e, 0x38, + 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0xc8, 0x9d, 0x4b, 0x00, 0x22, 0x10, 0x9a, 0x97, 0x58, 0x54, + 0x09, 0x26, 0x84, 0x84, 0xf5, 0xc0, 0x8e, 0x42, 0xb1, 0x57, 0x4a, 0x04, 0x55, 0x10, 0x62, 0x8b, + 0x12, 0xc7, 0xaf, 0xe9, 0x1a, 0x2c, 0x1c, 0x4c, 0x02, 0x8c, 0x49, 0x6c, 0x60, 0xf7, 0x1a, 0x03, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x14, 0x6a, 0x14, 0xd6, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -160,6 +164,14 @@ type SimpleServiceServer interface { SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error) } +// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations. +type UnimplementedSimpleServiceServer struct { +} + +func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented") +} + func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) { s.RegisterService(&_SimpleService_serviceDesc, srv) } diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto index aa6ec842a..59e79d3b9 100644 --- a/internal/praefect/mock/mock.proto +++ b/internal/praefect/mock/mock.proto @@ -7,6 +7,8 @@ syntax = "proto3"; package mock; +import "shared.proto"; + message SimpleRequest { int32 value = 1; } @@ -17,7 +19,10 @@ message SimpleResponse { service SimpleService { // SimpleUnaryUnary is a simple unary request with unary response rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) { - option (gitaly.op_type).op = ACCESSOR; + option (gitaly.op_type) = { + op: ACCESSOR + scope_level: SERVER + }; } } diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go index f6e01811b..adcf7a65e 100644 --- a/internal/praefect/mocksvc_test.go +++ b/internal/praefect/mocksvc_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -11,7 +11,7 @@ type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock. // mockSvc is an implementation of mock.SimpleServer for testing purposes. The // gRPC stub can be updated via go generate: // -//go:generate protoc --go_out=plugins=grpc:. mock/mock.proto +//go:generate protoc --go_out=plugins=grpc:. -I../../proto -I./ mock/mock.proto //go:generate goimports -w mock/mock.pb.go type mockSvc struct { simpleUnaryUnary simpleUnaryUnaryCallback diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go new file mode 100644 index 000000000..db3224c4c --- /dev/null +++ b/internal/praefect/models/node.go @@ -0,0 +1,17 @@ +package models + +// StorageNode describes an address that serves a storage +type StorageNode struct { + ID int + Storage string `toml:"storage"` + Address string `toml:"address"` + Token string `toml:"token"` +} + +// Repository describes a repository's relative path and its primary and list of secondaries +type Repository struct { + ID int + RelativePath string + Primary StorageNode + Replicas []StorageNode +} diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go deleted file mode 100644 index 854254d87..000000000 --- a/internal/praefect/models/nodes.go +++ /dev/null @@ -1,8 +0,0 @@ -package models - -// GitalyServer allows configuring the servers that RPCs are proxied to -type GitalyServer struct { - Name string `toml:"name"` - ListenAddr string `toml:"listen_addr" split_words:"true"` - Token string `toml:"token"` -} diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go deleted file mode 100644 index e11cdbf0a..000000000 --- a/internal/praefect/models/repository.go +++ /dev/null @@ -1,8 +0,0 @@ -package models - -// Repository provides all necessary information to address a repository hosted -// in a specific Gitaly replica -type Repository struct { - RelativePath string `toml:"relative_path"` // relative path of repository - Storage string `toml:"storage"` // storage location, e.g. default -} diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go index 811b56140..770f3ddd0 100644 --- a/internal/praefect/protoregistry/protoregistry.go +++ b/internal/praefect/protoregistry/protoregistry.go @@ -43,11 +43,24 @@ const ( OpMutator ) +// Scope represents the scope for an RPC method +type Scope int + +const ( + // ScopeRepository = repository scope + ScopeRepository = iota + // ScopeStorage = storage scope + ScopeStorage + // ScopeServer = serer scope + ScopeServer +) + // MethodInfo contains metadata about the RPC method. Refer to documentation // for message type "OperationMsg" shared.proto in gitlab-org/gitaly-proto for // more documentation. type MethodInfo struct { Operation OpType + Scope Scope targetRepo []int requestName string // protobuf message name for input type requestFactory protoFactory @@ -55,13 +68,6 @@ type MethodInfo struct { // TargetRepo returns the target repository for a protobuf message if it exists func (mi MethodInfo) TargetRepo(msg proto.Message) (*gitalypb.Repository, error) { - if mi.requestName != proto.MessageName(msg) { - return nil, fmt.Errorf( - "proto message %s does not match expected RPC request message %s", - proto.MessageName(msg), mi.requestName, - ) - } - return reflectFindRepoTarget(msg, mi.targetRepo) } @@ -179,6 +185,11 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo, return MethodInfo{}, err } + scope, err := parseScope(opMsg.GetScopeLevel()) + if err != nil { + return MethodInfo{}, err + } + // for some reason, the protobuf descriptor contains an extra dot in front // of the request name that the generated code does not. This trimming keeps // the two copies consistent for comparisons. @@ -194,9 +205,21 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo, targetRepo: targetRepo, requestName: requestName, requestFactory: reqFactory, + Scope: scope, }, nil } +func parseScope(scope gitalypb.OperationMsg_Scope) (Scope, error) { + switch scope { + case gitalypb.OperationMsg_REPOSITORY: + return ScopeRepository, nil + case gitalypb.OperationMsg_SERVER: + return ScopeServer, nil + } + + return ScopeRepository, errors.New("scope not found") +} + // parses a string like "1.1" and returns a slice of ints func parseOID(rawFieldOID string) ([]int, error) { var fieldNos []int diff --git a/internal/praefect/protoregistry/targetrepo_test.go b/internal/praefect/protoregistry/targetrepo_test.go index 8d6629524..286ebcf41 100644 --- a/internal/praefect/protoregistry/targetrepo_test.go +++ b/internal/praefect/protoregistry/targetrepo_test.go @@ -56,7 +56,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) { svc: "RepositoryService", method: "RepackIncremental", pbMsg: &gitalypb.RepackIncrementalResponse{}, - expectErr: errors.New("proto message gitaly.RepackIncrementalResponse does not match expected RPC request message gitaly.RepackIncrementalRequest"), + expectErr: errors.New("unable to descend OID [1] into message gitaly.RepackIncrementalResponse: unable to find protobuf field 1 in message RepackIncrementalResponse"), }, { desc: "target nested in oneOf", diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index c56f0488c..d873fee46 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -8,32 +8,33 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" "github.com/sirupsen/logrus" ) // Replicator performs the actual replication logic between two nodes type Replicator interface { - Replicate(ctx context.Context, source models.Repository, target Node) error + Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error } type defaultReplicator struct { log *logrus.Logger } -func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { repository := &gitalypb.Repository{ - StorageName: target.Storage, + StorageName: targetStorage, RelativePath: source.RelativePath, } remoteRepository := &gitalypb.Repository{ - StorageName: source.Storage, + StorageName: sourceStorage, RelativePath: source.RelativePath, } - repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc) - remoteClient := gitalypb.NewRemoteServiceClient(target.cc) + repositoryClient := gitalypb.NewRepositoryServiceClient(target) + remoteClient := gitalypb.NewRemoteServiceClient(target) // CreateRepository is idempotent if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{ @@ -60,7 +61,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Logger - dataStore Datastore + replicasDS ReplicasDatastore + replJobsDS ReplJobsDatastore coordinator *Coordinator targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic @@ -74,10 +76,11 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Logger, replicasDS ReplicasDatastore, jobsDS ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, - dataStore: ds, + replicasDS: replicasDS, + replJobsDS: jobsDS, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, targetNode: targetNode, @@ -118,7 +121,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository return nil } - id, err := r.dataStore.CreateSecondaryReplJobs(repo) + id, err := r.replJobsDS.CreateReplicaReplJobs(repo.RelativePath) if err != nil { return err } @@ -140,58 +143,65 @@ const ( // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { for { - jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10) + nodes, err := r.replicasDS.GetStorageNodes() if err != nil { - return err + return nil } - if len(jobs) == 0 { - r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval) - - select { - // TODO: exponential backoff when no queries are returned - case <-time.After(jobFetchInterval): - continue - - case <-ctx.Done(): - return ctx.Err() - } - } - - r.log.Debugf("fetched replication jobs: %#v", jobs) - - for _, job := range jobs { - r.log.WithField(logWithReplJobID, job.ID). - Infof("processing replication job %#v", job) - node, err := r.coordinator.GetStorageNode(job.Target) - r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err) + for _, node := range nodes { + jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.ID, 10) if err != nil { return err } - if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { - return err - } + if len(jobs) == 0 { + r.log.Tracef("no jobs for %d, checking again in %s", node.ID, jobFetchInterval) - primary, err := r.dataStore.GetShardPrimary(job.Source) - if err != nil { - return err - } - - ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token) - if err != nil { - return err - } + select { + // TODO: exponential backoff when no queries are returned + case <-time.After(jobFetchInterval): + continue - if err := r.replicator.Replicate(ctx, job.Source, node); err != nil { - r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") - return err + case <-ctx.Done(): + return ctx.Err() + } } - r.log.WithField(logWithReplJobID, job.ID). - Info("completed replication") - if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { - return err + for _, job := range jobs { + r.log.WithField(logWithReplJobID, job.ID). + Infof("processing replication job %#v", job) + node, err := r.replicasDS.GetStorageNode(job.TargetNodeID) + if err != nil { + return err + } + + repository, err := r.replicasDS.GetRepository(job.Source.RelativePath) + if err != nil { + return err + } + + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + return err + } + + ctx, err = helper.InjectGitalyServers(ctx, job.SourceStorage, repository.Primary.Address, "") + if err != nil { + return err + } + + cc, err := r.coordinator.GetConnection(node.Storage) + if err != nil { + return err + } + + if err := r.replicator.Replicate(ctx, job.Source, job.SourceStorage, node.Storage, cc); err != nil { + r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") + return err + } + + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil { + return err + } } } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 1294bc989..b3a461539 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -28,43 +28,33 @@ import ( // TestReplicatorProcessJobs verifies that a replicator will schedule jobs for // all whitelisted repos func TestReplicatorProcessJobsWhitelist(t *testing.T) { - var ( - cfg = config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup1", - ListenAddr: "tcp://gitaly-backup1.example.com", - }, - { - Name: "backup2", - ListenAddr: "tcp://gitaly-backup2.example.com", - }, - }, - Whitelist: []string{"abcd1234", "edfg5678"}, - } - datastore = NewMemoryDatastore(cfg) - coordinator = NewCoordinator(logrus.New(), datastore) - resultsCh = make(chan result) - replman = NewReplMgr( - cfg.SecondaryServers[1].Name, - logrus.New(), - datastore, - coordinator, - WithWhitelist(cfg.Whitelist), - WithReplicator(&mockReplicator{resultsCh}), - ) + datastore := NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{ + &models.StorageNode{ + ID: 1, + Address: "tcp://gitaly-primary.example.com", + Storage: "praefect-internal-1", + }, &models.StorageNode{ + ID: 2, + Address: "tcp://gitaly-backup1.example.com", + Storage: "praefect-internal-2", + }}, + Whitelist: []string{"abcd1234", "edfg5678"}, + }) + + coordinator := NewCoordinator(logrus.New(), datastore) + resultsCh := make(chan result) + replman := NewReplMgr( + "default", + logrus.New(), + datastore, + datastore, + coordinator, + WithReplicator(&mockReplicator{resultsCh}), ) - for _, node := range []*models.GitalyServer{ - cfg.PrimaryServer, - cfg.SecondaryServers[0], - cfg.SecondaryServers[1], - } { - err := coordinator.RegisterNode(node.Name, node.ListenAddr) + for _, node := range datastore.storageNodes.m { + err := coordinator.RegisterNode(node.Storage, node.Address) require.NoError(t, err) } @@ -78,14 +68,27 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { success := make(chan struct{}) + var expectedResults []result + // we expect one job per whitelisted repo with each backend server + for _, shard := range datastore.repositories.m { + for _, secondary := range shard.Replicas { + cc, err := coordinator.GetConnection(secondary.Storage) + require.NoError(t, err) + expectedResults = append(expectedResults, + result{source: models.Repository{RelativePath: shard.RelativePath}, + targetStorage: secondary.Storage, + targetCC: cc, + }) + } + } + go func() { // we expect one job per whitelisted repo with each backend server - for i := 0; i < len(cfg.Whitelist); i++ { - result := <-resultsCh - - assert.Contains(t, cfg.Whitelist, result.source.RelativePath) - assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage) - assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage) + for _, shard := range datastore.repositories.m { + for range shard.Replicas { + result := <-resultsCh + assert.Contains(t, expectedResults, result) + } } cancel() @@ -106,18 +109,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { } type result struct { - source models.Repository - target Node + source models.Repository + targetStorage string + targetCC *grpc.ClientConn } type mockReplicator struct { resultsCh chan<- result } -func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { select { - case mr.resultsCh <- result{source, target}: + case mr.resultsCh <- result{source, targetStorage, target}: return nil case <-ctx.Done(): @@ -178,11 +182,11 @@ func TestReplicate(t *testing.T) { var replicator defaultReplicator require.NoError(t, replicator.Replicate( ctx, - models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()}, - Node{ - cc: conn, - Storage: backupStorageName, - })) + models.Repository{RelativePath: testRepo.GetRelativePath()}, + "default", + backupStorageName, + conn, + )) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 915d7281a..5952a66ce 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -7,14 +7,15 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" - "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "google.golang.org/grpc" ) @@ -44,26 +45,49 @@ func TestServerSimpleUnaryUnary(t *testing.T) { }, } + gz := proto.FileDescriptor("mock/mock.proto") + fd, err := protoregistry.ExtractFileDescriptor(gz) + if err != nil { + panic(err) + } + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { const ( storagePrimary = "default" - storageBackup = "backup" ) - datastore := praefect.NewMemoryDatastore(config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, + datastore := NewMemoryDatastore(config.Config{ + StorageNodes: []*models.StorageNode{ + &models.StorageNode{ + ID: 1, + Storage: "praefect-internal-1", + }, + &models.StorageNode{ + ID: 2, + Storage: "praefect-internal-2", + }}, }) - coordinator := praefect.NewCoordinator(logrus.New(), datastore) - replmgr := praefect.NewReplMgr( + + coordinator := NewCoordinator(logrus.New(), datastore, fd) + + for id, nodeStorage := range datastore.storageNodes.m { + backend, cleanup := newMockDownstream(t, tt.callback) + defer cleanup() // clean up mock downstream server resources + + coordinator.RegisterNode(nodeStorage.Storage, backend) + nodeStorage.Address = backend + datastore.storageNodes.m[id] = nodeStorage + } + + replmgr := NewReplMgr( storagePrimary, logrus.New(), datastore, + datastore, coordinator, ) - prf := praefect.NewServer( + prf := NewServer( coordinator, replmgr, nil, @@ -85,13 +109,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) { defer cc.Close() cli := mock.NewSimpleServiceClient(cc) - for _, replica := range []string{storagePrimary, storageBackup} { - backend, cleanup := newMockDownstream(t, tt.callback) - defer cleanup() // clean up mock downstream server resources - - coordinator.RegisterNode(replica, backend) - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() -- cgit v1.2.3