Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-07-19 08:22:14 +0300
committerJohn Cai <jcai@gitlab.com>2019-07-19 08:22:14 +0300
commitb399a9c974df029d8e4a519d0fba1ae0b44eb8a9 (patch)
treece7682b699582627d18c4c568e295add3346f782
parenta3047689e849690c7c36b85d6092433df6732422 (diff)
Handle failover by catching SIGUSR1 signal AND other refactors
-rw-r--r--_support/praefect-cluster/config.praefect.toml14
-rw-r--r--_support/praefect-cluster/config.toml27
-rw-r--r--_support/praefect-cluster/docker-compose.yml55
-rw-r--r--_support/praefect-cluster/gitaly-backup-1.toml49
-rw-r--r--_support/praefect-cluster/gitaly-backup-2.toml49
-rw-r--r--_support/praefect-cluster/gitaly-primary.toml49
-rw-r--r--changelogs/unreleased/jc-naive-failover.yml5
-rw-r--r--cmd/praefect/main.go9
-rw-r--r--config.praefect.toml.example7
-rw-r--r--go.mod5
-rw-r--r--go.sum13
-rw-r--r--internal/helper/storage.go18
-rw-r--r--internal/praefect/common.go7
-rw-r--r--internal/praefect/config/config.go9
-rw-r--r--internal/praefect/config/config_test.go20
-rw-r--r--internal/praefect/coordinator.go104
-rw-r--r--internal/praefect/coordinator_test.go47
-rw-r--r--internal/praefect/datastore.go164
-rw-r--r--internal/praefect/datastore_memory_test.go27
-rw-r--r--internal/praefect/datastore_test.go14
-rw-r--r--internal/praefect/models/nodes.go8
-rw-r--r--internal/praefect/models/repository.go8
-rw-r--r--internal/praefect/replicator.go45
-rw-r--r--internal/praefect/replicator_test.go132
-rw-r--r--internal/praefect/server_test.go3
25 files changed, 627 insertions, 261 deletions
diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml
index f0b3e8f0a..e0f163178 100644
--- a/_support/praefect-cluster/config.praefect.toml
+++ b/_support/praefect-cluster/config.praefect.toml
@@ -6,13 +6,14 @@ listen_addr = ":2305"
# # Praefect can listen on a socket when placed on the same machine as all clients
#socket_path = "/etc/gitaly/praefect/socket"
# # Praefect will only replicate whitelisted repositories
-# whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git"]
+whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git"]
# # Optional: export metrics via Prometheus
# prometheus_listen_addr = "127.0.01:10101"
# # You can optionally configure Praefect to output JSON-formatted log messages to stdout
-# [logging]
+[logging]
# format = "json"
+ level = "info"
# # Optional: Set log level to only log entries with that severity or above
# # One of, in order: debug, info, warn, errror, fatal, panic
# # Defaults to "info"
@@ -25,12 +26,15 @@ listen_addr = ":2305"
[primary_server]
name = "default"
- listen_addr = "tcp://gitaly-primary:9999"
+# listen_addr = "tcp://gitaly-primary:9999"
+ listen_addr = "tcp://127.0.0.1:9999"
[[secondary_server]]
name = "backup1"
- listen_addr = "tcp://gitaly-backup-1:9999"
+# listen_addr = "tcp://gitaly-backup-1:9999"
+ listen_addr = "tcp://127.0.0.1:9998"
[[secondary_server]]
name = "backup2"
- listen_addr = "tcp://gitaly-backup-2:9999" \ No newline at end of file
+# listen_addr = "tcp://gitaly-backup-2:9999"
+ listen_addr = "tcp://127.0.0.1:9997" \ No newline at end of file
diff --git a/_support/praefect-cluster/config.toml b/_support/praefect-cluster/config.toml
deleted file mode 100644
index 3c1ff277f..000000000
--- a/_support/praefect-cluster/config.toml
+++ /dev/null
@@ -1,27 +0,0 @@
-listen_addr = ":9999"
-prometheus_listen_addr = ":9236"
-bin_dir = "/app/_build/bin"
-
-# # Git executable settings
-[git]
-bin_path = "/usr/local/bin/git"
-
-[[storage]]
-name = "default"
-path = "/var/opt/gitlab/git-data"
-
-# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
-[prometheus]
-grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
-
-[gitaly-ruby]
-# The directory where gitaly-ruby is installed
-dir = "/app/ruby"
-
-[gitlab-shell]
-# The directory where gitlab-shell is installed
-dir = "/app/gitlab-shell"
-
-[[concurrency]]
-rpc = "/gitaly.RepositoryService/GarbageCollect"
-max_per_repo = 1
diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml
index e115ebd55..6eb81be47 100644
--- a/_support/praefect-cluster/docker-compose.yml
+++ b/_support/praefect-cluster/docker-compose.yml
@@ -1,37 +1,52 @@
version: "3.5"
services:
- praefect:
- build:
- context: ../../
- dockerfile: Dockerfile.praefect
- image: praefect:latest
- depends_on:
- - gitaly-primary
- - gitaly-backup-1
- - gitaly-backup-2
- command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"]
- ports:
- - "2305:2305"
- volumes:
- - ./config.praefect.toml:/etc/gitaly/config.praefect.toml
+# praefect:
+# build:
+# context: ../../
+# dockerfile: Dockerfile.praefect
+# image: praefect:latest
+# depends_on:
+# - gitaly-primary
+# - gitaly-backup-1
+# - gitaly-backup-2
+# command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"]
+# ports:
+# - "2305:2305"
+# volumes:
+# - ./config.praefect.toml:/etc/gitaly/config.praefect.toml
gitaly-primary:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
+ environment:
+ - GITALY_TESTING_NO_GIT_HOOKS=1
expose:
- "9999"
+ ports:
+ - "9999:9999"
+ command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-primary/data:/var/opt/gitlab/git-data
- - ./config.toml:/etc/config/config.toml
+ - ./gitaly-primary/data:/home/git/repositories
+ - ./gitaly-primary.toml:/etc/config/config.toml
gitaly-backup-1:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
+ environment:
+ - GITALY_TESTING_NO_GIT_HOOKS=1
expose:
- "9999"
+ ports:
+ - "9998:9999"
+ command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-backup-1/data:/var/opt/gitlab/git-data
- - ./config.toml:/etc/config/config.toml
+ - ./gitaly-backup-1/data:/home/git/repositories
+ - ./gitaly-backup-1.toml:/etc/config/config.toml
gitaly-backup-2:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
+ environment:
+ - GITALY_TESTING_NO_GIT_HOOKS=1
expose:
- "9999"
+ ports:
+ - "9997:9999"
+ command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-backup-2/data:/var/opt/gitlab/git-data
- - ./config.toml:/etc/config/config.toml \ No newline at end of file
+ - ./gitaly-backup-2/data:/home/git/repositories
+ - ./gitaly-backup-2.toml:/etc/config/config.toml \ No newline at end of file
diff --git a/_support/praefect-cluster/gitaly-backup-1.toml b/_support/praefect-cluster/gitaly-backup-1.toml
new file mode 100644
index 000000000..89d1884e3
--- /dev/null
+++ b/_support/praefect-cluster/gitaly-backup-1.toml
@@ -0,0 +1,49 @@
+# Example Gitaly configuration file
+
+# The directory where Gitaly's executables are stored
+bin_dir = "/usr/local/bin"
+
+# listen on a TCP socket. This is insecure (no authentication)
+listen_addr = "0.0.0.0:9999"
+
+# # Optional: export metrics via Prometheus
+# prometheus_listen_addr = "localhost:9236"
+#
+
+# # Git executable settings
+# [git]
+# bin_path = "/usr/bin/git"
+
+[[storage]]
+name = "backup1"
+path = "/home/git/repositories"
+
+# # You can optionally configure more storages for this Gitaly instance to serve up
+#
+# [[storage]]
+# name = "other_storage"
+# path = "/mnt/other_storage/repositories"
+#
+
+# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout
+# [logging]
+# format = "json"
+# # Additionally exceptions can be reported to Sentry
+# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>"
+
+# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
+# [prometheus]
+# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
+
+[gitaly-ruby]
+# The directory where gitaly-ruby is installed
+dir = "/srv/gitaly-ruby"
+
+[gitlab-shell]
+# The directory where gitlab-shell is installed
+dir = "/srv/gitlab-shell"
+
+# # You can adjust the concurrency of each RPC endpoint
+# [[concurrency]]
+# rpc = "/gitaly.RepositoryService/GarbageCollect"
+# max_per_repo = 1
diff --git a/_support/praefect-cluster/gitaly-backup-2.toml b/_support/praefect-cluster/gitaly-backup-2.toml
new file mode 100644
index 000000000..1b5ce8d20
--- /dev/null
+++ b/_support/praefect-cluster/gitaly-backup-2.toml
@@ -0,0 +1,49 @@
+# Example Gitaly configuration file
+
+# The directory where Gitaly's executables are stored
+bin_dir = "/usr/local/bin"
+
+# listen on a TCP socket. This is insecure (no authentication)
+listen_addr = "0.0.0.0:9999"
+
+# # Optional: export metrics via Prometheus
+# prometheus_listen_addr = "localhost:9236"
+#
+
+# # Git executable settings
+# [git]
+# bin_path = "/usr/bin/git"
+
+[[storage]]
+name = "backup2"
+path = "/home/git/repositories"
+
+# # You can optionally configure more storages for this Gitaly instance to serve up
+#
+# [[storage]]
+# name = "other_storage"
+# path = "/mnt/other_storage/repositories"
+#
+
+# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout
+# [logging]
+# format = "json"
+# # Additionally exceptions can be reported to Sentry
+# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>"
+
+# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
+# [prometheus]
+# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
+
+[gitaly-ruby]
+# The directory where gitaly-ruby is installed
+dir = "/srv/gitaly-ruby"
+
+[gitlab-shell]
+# The directory where gitlab-shell is installed
+dir = "/srv/gitlab-shell"
+
+# # You can adjust the concurrency of each RPC endpoint
+# [[concurrency]]
+# rpc = "/gitaly.RepositoryService/GarbageCollect"
+# max_per_repo = 1
diff --git a/_support/praefect-cluster/gitaly-primary.toml b/_support/praefect-cluster/gitaly-primary.toml
new file mode 100644
index 000000000..2379b6951
--- /dev/null
+++ b/_support/praefect-cluster/gitaly-primary.toml
@@ -0,0 +1,49 @@
+# Example Gitaly configuration file
+
+# The directory where Gitaly's executables are stored
+bin_dir = "/usr/local/bin"
+
+# listen on a TCP socket. This is insecure (no authentication)
+listen_addr = "0.0.0.0:9999"
+
+# # Optional: export metrics via Prometheus
+# prometheus_listen_addr = "localhost:9236"
+#
+
+# # Git executable settings
+# [git]
+# bin_path = "/usr/bin/git"
+
+[[storage]]
+name = "default"
+path = "/home/git/repositories"
+
+# # You can optionally configure more storages for this Gitaly instance to serve up
+#
+# [[storage]]
+# name = "other_storage"
+# path = "/mnt/other_storage/repositories"
+#
+
+# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout
+# [logging]
+# format = "json"
+# # Additionally exceptions can be reported to Sentry
+# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>"
+
+# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
+# [prometheus]
+# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
+
+[gitaly-ruby]
+# The directory where gitaly-ruby is installed
+dir = "/srv/gitaly-ruby"
+
+[gitlab-shell]
+# The directory where gitlab-shell is installed
+dir = "/srv/gitlab-shell"
+
+# # You can adjust the concurrency of each RPC endpoint
+# [[concurrency]]
+# rpc = "/gitaly.RepositoryService/GarbageCollect"
+# max_per_repo = 1
diff --git a/changelogs/unreleased/jc-naive-failover.yml b/changelogs/unreleased/jc-naive-failover.yml
new file mode 100644
index 000000000..752e770a0
--- /dev/null
+++ b/changelogs/unreleased/jc-naive-failover.yml
@@ -0,0 +1,5 @@
+---
+title: Handle failover by catching SIGUSR1 signal
+merge_request: 1346
+author:
+type: other
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 37fdddb22..acf53d2fa 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -16,6 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
+
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/version"
@@ -113,8 +114,6 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
-
allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer)
for _, gitaly := range allBackendServers {
@@ -122,9 +121,13 @@ func run(listeners []net.Listener, conf config.Config) error {
return fmt.Errorf("failed to register %s: %s", gitaly.Name, err)
}
- logger.WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node")
+ logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node")
}
+ go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
+
+ go coordinator.FailoverRotation()
+
select {
case s := <-termCh:
logger.WithField("signal", s).Warn("received signal, shutting down gracefully")
diff --git a/config.praefect.toml.example b/config.praefect.toml.example
index 59e7563f1..1393de890 100644
--- a/config.praefect.toml.example
+++ b/config.praefect.toml.example
@@ -26,11 +26,14 @@ listen_addr = "127.0.0.1:2305"
[primary_server]
name = "default"
listen_addr = "tcp://gitaly-primary.example.com"
+ token = "abcd1234"
# [[secondary_server]]
-# name = "default"
+# name = "backup-1"
# listen_addr = "tcp://gitaly-backup1.example.com"
+# token = "abcd1234"
# [[secondary_server]]
-# name = "backup"
+# name = "backup-2"
# listen_addr = "tcp://gitaly-backup2.example.com"
+# token = "abcd1234"
diff --git a/go.mod b/go.mod
index 512f62805..a8b29a4a1 100644
--- a/go.mod
+++ b/go.mod
@@ -9,6 +9,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kelseyhightower/envconfig v1.3.0
+ github.com/kr/pretty v0.1.0 // indirect
github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47
github.com/prometheus/client_golang v1.0.0
github.com/sirupsen/logrus v1.2.0
@@ -16,10 +17,12 @@ require (
github.com/tinylib/msgp v1.1.0 // indirect
gitlab.com/gitlab-org/gitaly-proto v1.37.0
gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c
+ golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 // indirect
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
- golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a
+ golang.org/x/sys v0.0.0-20190422165155-953cdadca894
google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect
google.golang.org/grpc v1.16.0
+ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)
diff --git a/go.sum b/go.sum
index 6dfdc7794..d97d828f7 100644
--- a/go.sum
+++ b/go.sum
@@ -51,6 +51,11 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47 h1:HDt7WT3kpXSHq4mlOuLzgXH9LeOK1qlhyFdKIAzxxeM=
github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47/go.mod h1:4bKN42efkbNYMZlvDfxGDxzl066GhpvIircZDsm8Y+Y=
github.com/lightstep/lightstep-tracer-go v0.15.6 h1:D0GGa7afJ7GcQvu5as6ssLEEKYXvRgKI5d5cevtz8r4=
@@ -112,11 +117,14 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc=
+golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
@@ -132,6 +140,9 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -147,6 +158,8 @@ gopkg.in/DataDog/dd-trace-go.v1 v1.7.0/go.mod h1:DVp8HmDh8PuTu2Z0fVVlBsyWaC++fzw
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
diff --git a/internal/helper/storage.go b/internal/helper/storage.go
index 83341b0c4..4e535a5d6 100644
--- a/internal/helper/storage.go
+++ b/internal/helper/storage.go
@@ -33,3 +33,21 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly
return
}
+
+// InjectGitalyServers injects gitaly-servers metadata into an outgoing context
+func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) {
+
+ gitalyServers := storage.GitalyServers{
+ name: {
+ "address": address,
+ "token": token,
+ },
+ }
+
+ gitalyServersJSON, err := json.Marshal(gitalyServers)
+ if err != nil {
+ return nil, err
+ }
+
+ return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))), nil
+}
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
index a63d309bd..2df2a4823 100644
--- a/internal/praefect/common.go
+++ b/internal/praefect/common.go
@@ -2,13 +2,6 @@ package praefect
import "google.golang.org/grpc"
-// Repository provides all necessary information to address a repository hosted
-// in a specific Gitaly replica
-type Repository struct {
- RelativePath string // relative path of repository
- Storage string // storage location, e.g. default
-}
-
// Node is a wrapper around the grpc client connection for a backend Gitaly node
type Node struct {
Storage string
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 768104ed1..6a2a5b5d5 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -5,7 +5,9 @@ import (
"os"
"github.com/BurntSushi/toml"
+
"gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
// Config is a container for everything found in the TOML config file
@@ -13,8 +15,8 @@ type Config struct {
ListenAddr string `toml:"listen_addr"`
SocketPath string `toml:"socket_path"`
- PrimaryServer *GitalyServer `toml:"primary_server"`
- SecondaryServers []*GitalyServer `toml:"secondary_server"`
+ PrimaryServer *models.GitalyServer `toml:"primary_server"`
+ SecondaryServers []*models.GitalyServer `toml:"secondary_server"`
// Whitelist is a list of relative project paths (paths comprised of project
// hashes) that are permitted to use high availability features
@@ -28,6 +30,7 @@ type Config struct {
type GitalyServer struct {
Name string `toml:"name"`
ListenAddr string `toml:"listen_addr" split_words:"true"`
+ Token string `toml:"token"`
}
// FromFile loads the config for the passed file path
@@ -50,7 +53,7 @@ var (
errGitalyWithoutName = errors.New("all gitaly servers must have a name")
)
-var emptyServer = &GitalyServer{}
+var emptyServer = &models.GitalyServer{}
// Validate establishes if the config is valid
func (c Config) Validate() error {
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index 33731b17d..eace5eb2f 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -5,13 +5,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
func TestConfigValidation(t *testing.T) {
- primarySrv := &GitalyServer{"test", "localhost:23456"}
- secondarySrvs := []*GitalyServer{
- {"test1", "localhost:23457"},
- {"test2", "localhost:23458"},
+ primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"}
+ secondarySrvs := []*models.GitalyServer{
+ {"test1", "localhost:23457", "secret-token"},
+ {"test2", "localhost:23458", "secret-token"},
}
testCases := []struct {
@@ -36,7 +37,7 @@ func TestConfigValidation(t *testing.T) {
},
{
desc: "duplicate address",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*GitalyServer{primarySrv}},
+ config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}},
err: errDuplicateGitalyAddr,
},
{
@@ -62,11 +63,11 @@ func TestConfigParsing(t *testing.T) {
{
filePath: "testdata/config.toml",
expected: Config{
- PrimaryServer: &GitalyServer{
+ PrimaryServer: &models.GitalyServer{
Name: "default",
ListenAddr: "tcp://gitaly-primary.example.com",
},
- SecondaryServers: []*GitalyServer{
+ SecondaryServers: []*models.GitalyServer{
{
Name: "default",
ListenAddr: "tcp://gitaly-backup1.example.com",
@@ -76,10 +77,7 @@ func TestConfigParsing(t *testing.T) {
ListenAddr: "tcp://gitaly-backup2.example.com",
},
},
- Whitelist: []string{
- "abcd1234",
- "edfg5678",
- },
+ Whitelist: []string{"abcd1234", "edfg5678"},
},
},
}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b2e6704d5..8f64022cb 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -3,10 +3,15 @@ package praefect
import (
"context"
"fmt"
+ "os"
+ "os/signal"
"sync"
+ "syscall"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
@@ -22,17 +27,18 @@ import (
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- log *logrus.Logger
- lock sync.RWMutex
+ log *logrus.Logger
+ failoverMutex sync.RWMutex
+ connMutex sync.RWMutex
- datastore PrimaryDatastore
+ datastore Datastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -63,12 +69,15 @@ func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
}
// streamDirector determines which downstream servers receive requests
-func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
+func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) {
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- storageName, err := c.datastore.GetPrimary()
+ c.failoverMutex.RLock()
+ defer c.failoverMutex.RUnlock()
+
+ serverConfig, err := c.datastore.GetDefaultPrimary()
if err != nil {
err := status.Error(
codes.FailedPrecondition,
@@ -79,9 +88,14 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, ok := c.getConn(storageName)
+ cc, ok := c.getConn(serverConfig.Name)
if !ok {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName)
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name)
+ }
+
+ ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token)
+ if err != nil {
+ return nil, nil, err
}
return ctx, cc, nil
@@ -106,15 +120,81 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
}
func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
- c.lock.Lock()
+ c.connMutex.Lock()
c.nodes[storageName] = conn
- c.lock.Unlock()
+ c.connMutex.Unlock()
}
func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) {
- c.lock.RLock()
+ c.connMutex.RLock()
cc, ok := c.nodes[storageName]
- c.lock.RUnlock()
+ c.connMutex.RUnlock()
return cc, ok
}
+
+// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
+func (c *Coordinator) FailoverRotation() {
+ c.handleSignalAndRotate()
+}
+
+func (c *Coordinator) handleSignalAndRotate() {
+ failoverChan := make(chan os.Signal, 1)
+ signal.Notify(failoverChan, syscall.SIGUSR1)
+
+ for {
+ <-failoverChan
+
+ c.failoverMutex.Lock()
+ primary, err := c.datastore.GetDefaultPrimary()
+ if err != nil {
+ c.log.Fatalf("error when getting default primary: %v", err)
+ }
+
+ if err := c.rotateSecondaryToPrimary(primary); err != nil {
+ c.log.WithError(err).Error("rotating secondary")
+ }
+ c.failoverMutex.Unlock()
+ }
+}
+
+func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error {
+ repositories, err := c.datastore.GetRepositoriesForPrimary(primary)
+ if err != nil {
+ return err
+ }
+
+ for _, repoPath := range repositories {
+ secondaries, err := c.datastore.GetShardSecondaries(models.Repository{
+ RelativePath: repoPath,
+ })
+ if err != nil {
+ return fmt.Errorf("getting secondaries: %v", err)
+ }
+
+ newPrimary := secondaries[0]
+ secondaries = append(secondaries[1:], primary)
+
+ if err = c.datastore.SetShardPrimary(models.Repository{
+ RelativePath: repoPath,
+ }, newPrimary); err != nil {
+ return fmt.Errorf("setting primary: %v", err)
+ }
+
+ if err = c.datastore.SetShardSecondaries(models.Repository{
+ RelativePath: repoPath,
+ }, secondaries); err != nil {
+ return fmt.Errorf("setting secondaries: %v", err)
+ }
+ }
+
+ // set the new default primary
+ primary, err = c.datastore.GetShardPrimary(models.Repository{
+ RelativePath: repositories[0],
+ })
+ if err != nil {
+ return fmt.Errorf("getting shard primary: %v", err)
+ }
+
+ return c.datastore.SetDefaultPrimary(primary)
+}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
new file mode 100644
index 000000000..50045f8a0
--- /dev/null
+++ b/internal/praefect/coordinator_test.go
@@ -0,0 +1,47 @@
+package praefect
+
+import (
+ "io/ioutil"
+ "testing"
+
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+)
+
+var testLogger = logrus.New()
+
+func init() {
+ testLogger.SetOutput(ioutil.Discard)
+}
+
+func TestSecondaryRotation(t *testing.T) {
+ cfg := config.Config{
+ PrimaryServer: &models.GitalyServer{Name: "primary"},
+ SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}},
+ Whitelist: []string{"/repoA", "/repoB"},
+ }
+ d := NewMemoryDatastore(cfg)
+ c := NewCoordinator(testLogger, d)
+
+ primary, err := d.GetDefaultPrimary()
+ require.NoError(t, err)
+
+ require.NoError(t, c.rotateSecondaryToPrimary(primary))
+
+ primary, err = d.GetDefaultPrimary()
+ require.NoError(t, err)
+ require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary")
+
+ repositories, err := d.GetRepositoriesForPrimary(primary)
+ require.NoError(t, err)
+
+ for _, repository := range repositories {
+ shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository})
+ require.NoError(t, err)
+
+ require.Len(t, shardSecondaries, 2)
+ require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0])
+ }
+}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index eeb9f9728..5678c6a24 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -12,6 +12,7 @@ import (
"sync"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
var (
@@ -41,9 +42,9 @@ const (
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
- ID uint64 // autoincrement ID
- Target string // which storage location to replicate to?
- Source Repository // source for replication
+ ID uint64 // autoincrement ID
+ Target string // which storage location to replicate to?
+ Source models.Repository // source for replication
State JobState
}
@@ -63,15 +64,13 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I
type Datastore interface {
ReplJobsDatastore
ReplicasDatastore
- PrimaryDatastore
+ TemporaryDatastore
}
-// PrimaryDatastore manages accessing and setting the primary storage location
-type PrimaryDatastore interface {
- // GetPrimary gets the primary storage location
- GetPrimary() (string, error)
- // SetPrimary sets the primary storage location
- SetPrimary(primary string) error
+// TemporaryDatastore contains methods that will go away once we move to a SQL datastore
+type TemporaryDatastore interface {
+ GetDefaultPrimary() (models.GitalyServer, error)
+ SetDefaultPrimary(primary models.GitalyServer) error
}
// ReplicasDatastore manages accessing and setting which secondary replicas
@@ -79,11 +78,18 @@ type PrimaryDatastore interface {
type ReplicasDatastore interface {
// GetSecondaries will retrieve all secondary replica storage locations for
// a primary replica
- GetSecondaries(primary Repository) ([]string, error)
+ GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error)
+
+ GetShardPrimary(repo models.Repository) (models.GitalyServer, error)
// SetSecondaries will set the secondary storage locations for a repository
// in a primary replica.
- SetSecondaries(primary Repository, secondaries []string) error
+ SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error
+
+ SetShardPrimary(repo models.Repository, primary models.GitalyServer) error
+
+ // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary
+ GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -92,12 +98,12 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, storage string, count int) ([]ReplJob, error)
+ GetJobs(flag JobState, node string, count int) ([]ReplJob, error)
// CreateSecondaryJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateSecondaryReplJobs(source Repository) ([]uint64, error)
+ CreateSecondaryReplJobs(source models.Repository) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
@@ -105,13 +111,13 @@ type ReplJobsDatastore interface {
// shard is a set of primary and secondary storage replicas for a project
type shard struct {
- primary string
- secondaries []string
+ primary models.GitalyServer
+ secondaries []models.GitalyServer
}
type jobRecord struct {
relativePath string // project's relative path
- target string
+ targetNode string
state JobState
}
@@ -132,7 +138,7 @@ type MemoryDatastore struct {
primary *struct {
sync.RWMutex
- storageName string
+ server models.GitalyServer
}
}
@@ -155,22 +161,26 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
},
primary: &struct {
sync.RWMutex
- storageName string
+ server models.GitalyServer
}{
- storageName: cfg.PrimaryServer.Name,
+ server: models.GitalyServer{
+ Name: cfg.PrimaryServer.Name,
+ ListenAddr: cfg.PrimaryServer.ListenAddr,
+ Token: cfg.PrimaryServer.Token,
+ },
},
}
- secondaries := make([]string, len(cfg.SecondaryServers))
+ secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers))
for i, server := range cfg.SecondaryServers {
- secondaries[i] = server.Name
+ secondaryServers[i] = *server
}
- for _, relativePath := range cfg.Whitelist {
+ for _, repo := range cfg.Whitelist {
// store the configuration file specified shard
- m.replicas.m[relativePath] = shard{
- primary: cfg.PrimaryServer.Name,
- secondaries: secondaries,
+ m.replicas.m[repo] = shard{
+ primary: *cfg.PrimaryServer,
+ secondaries: secondaryServers,
}
// initialize replication job queue to replicate all whitelisted repos
@@ -179,36 +189,70 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
m.jobs.next++
m.jobs.records[m.jobs.next] = jobRecord{
state: JobStateReady,
- target: secondary.Name,
- relativePath: relativePath,
+ targetNode: secondary.Name,
+ relativePath: repo,
}
}
-
}
return m
}
-// GetSecondaries will return the set of secondary storage locations for a
+// GetShardSecondaries will return the set of secondary storage locations for a
// given repository if they exist
-func (md *MemoryDatastore) GetSecondaries(primary Repository) ([]string, error) {
+func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) {
shard, _ := md.getShard(primary.RelativePath)
return shard.secondaries, nil
}
-// SetSecondaries will replace the set of replicas for a repository
-func (md *MemoryDatastore) SetSecondaries(primary Repository, secondaries []string) error {
+// SetShardSecondaries will replace the set of replicas for a repository
+func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error {
md.replicas.Lock()
- md.replicas.m[primary.RelativePath] = shard{
- primary: primary.Storage,
- secondaries: secondaries,
- }
- md.replicas.Unlock()
+ defer md.replicas.Unlock()
+
+ shard := md.replicas.m[repo.RelativePath]
+ shard.secondaries = secondaries
+ md.replicas.m[repo.RelativePath] = shard
return nil
}
+// SetShardPrimary sets the primary for a repository
+func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error {
+ md.replicas.Lock()
+ defer md.replicas.Unlock()
+
+ shard := md.replicas.m[repo.RelativePath]
+ shard.primary = primary
+ md.replicas.m[repo.RelativePath] = shard
+
+ return nil
+}
+
+// GetShardPrimary gets the primary for a repository
+func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) {
+ md.replicas.Lock()
+ defer md.replicas.Unlock()
+
+ shard := md.replicas.m[repo.RelativePath]
+ return shard.primary, nil
+}
+
+// GetRepositoriesForPrimary gets all repositories
+func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) {
+ md.replicas.Lock()
+ defer md.replicas.Unlock()
+
+ repositories := make([]string, 0, len(md.replicas.m))
+
+ for repository := range md.replicas.m {
+ repositories = append(repositories, repository)
+ }
+
+ return repositories, nil
+}
+
func (md *MemoryDatastore) getShard(project string) (shard, bool) {
md.replicas.RLock()
replicas, ok := md.replicas.m[project]
@@ -230,7 +274,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.target == storage {
+ if record.state&state != 0 && record.targetNode == storage {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -261,26 +305,26 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
return ReplJob{
ID: jobID,
- Source: Repository{
+ Source: models.Repository{
RelativePath: record.relativePath,
- Storage: shard.primary,
+ Storage: shard.primary.Name,
},
State: record.state,
- Target: record.target,
+ Target: record.targetNode,
}, nil
}
-// ErrInvalidReplTarget indicates a target repository cannot be chosen because
+// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because
// it fails preconditions for being replicatable
-var ErrInvalidReplTarget = errors.New("target repository fails preconditions for replication")
+var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication")
// CreateSecondaryReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64, error) {
+func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
- emptyRepo := Repository{}
+ emptyRepo := models.Repository{}
if source == emptyRepo {
return nil, errors.New("invalid source repository")
}
@@ -300,7 +344,7 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64,
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- target: secondary,
+ targetNode: secondary.Name,
state: JobStatePending,
relativePath: source.RelativePath,
}
@@ -333,24 +377,34 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
}
// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary string) error {
+func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error {
md.primary.Lock()
defer md.primary.Unlock()
- md.primary.storageName = primary
+ md.primary.server = primary
return nil
}
-// GetPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetPrimary() (string, error) {
+// GetDefaultPrimary gets the primary datastore location
+func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) {
md.primary.RLock()
defer md.primary.RUnlock()
- storageName := md.primary.storageName
- if storageName == "" {
- return "", ErrPrimaryNotSet
+ primary := md.primary.server
+ if primary == (models.GitalyServer{}) {
+ return primary, ErrPrimaryNotSet
}
- return storageName, nil
+ return primary, nil
+}
+
+// SetDefaultPrimary gets the primary datastore location
+func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error {
+ md.primary.RLock()
+ defer md.primary.RUnlock()
+
+ md.primary.server = primary
+
+ return nil
}
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
index a306b3ce6..6099a8328 100644
--- a/internal/praefect/datastore_memory_test.go
+++ b/internal/praefect/datastore_memory_test.go
@@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will
@@ -13,10 +14,10 @@ import (
// with a configuration file specifying the shard and whitelisted repositories.
func TestMemoryDatastoreWhitelist(t *testing.T) {
cfg := config.Config{
- PrimaryServer: &config.GitalyServer{
+ PrimaryServer: &models.GitalyServer{
Name: "default",
},
- SecondaryServers: []*config.GitalyServer{
+ SecondaryServers: []*models.GitalyServer{
{
Name: "backup-1",
},
@@ -24,32 +25,30 @@ func TestMemoryDatastoreWhitelist(t *testing.T) {
Name: "backup-2",
},
},
- Whitelist: []string{
- "abcd1234",
- "5678efgh",
- },
+ Whitelist: []string{"abcd1234", "5678efgh"},
}
mds := praefect.NewMemoryDatastore(cfg)
- repo1 := praefect.Repository{
+ repo1 := models.Repository{
RelativePath: cfg.Whitelist[0],
Storage: cfg.PrimaryServer.Name,
}
- repo2 := praefect.Repository{
+
+ repo2 := models.Repository{
RelativePath: cfg.Whitelist[1],
Storage: cfg.PrimaryServer.Name,
}
- expectSecondaries := []string{
- cfg.SecondaryServers[0].Name,
- cfg.SecondaryServers[1].Name,
+ expectSecondaries := []models.GitalyServer{
+ models.GitalyServer{Name: cfg.SecondaryServers[0].Name},
+ models.GitalyServer{Name: cfg.SecondaryServers[1].Name},
}
- for _, repo := range []praefect.Repository{repo1, repo2} {
- actualSecondaries, err := mds.GetSecondaries(repo)
+ for _, repo := range []models.Repository{repo1, repo2} {
+ actualSecondaries, err := mds.GetShardSecondaries(repo)
require.NoError(t, err)
- require.ElementsMatch(t, actualSecondaries, expectSecondaries)
+ require.ElementsMatch(t, expectSecondaries, actualSecondaries)
}
backup1 := cfg.SecondaryServers[0]
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 6534b9d88..417a04be2 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
const (
@@ -15,7 +16,7 @@ const (
)
var (
- repo1Primary = praefect.Repository{
+ repo1Primary = models.Repository{
RelativePath: proj1,
Storage: stor1,
}
@@ -41,9 +42,16 @@ var operations = []struct {
},
},
{
+ desc: "set the primary for the shard",
+ opFn: func(t *testing.T, ds praefect.Datastore) {
+ err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1})
+ require.NoError(t, err)
+ },
+ },
+ {
desc: "associate the replication job target with a primary",
opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetSecondaries(repo1Primary, []string{stor2})
+ err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}})
require.NoError(t, err)
},
},
@@ -93,7 +101,7 @@ var flavors = map[string]func() praefect.Datastore{
"in-memory-datastore": func() praefect.Datastore {
return praefect.NewMemoryDatastore(
config.Config{
- PrimaryServer: &config.GitalyServer{
+ PrimaryServer: &models.GitalyServer{
Name: "default",
},
})
diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go
new file mode 100644
index 000000000..854254d87
--- /dev/null
+++ b/internal/praefect/models/nodes.go
@@ -0,0 +1,8 @@
+package models
+
+// GitalyServer allows configuring the servers that RPCs are proxied to
+type GitalyServer struct {
+ Name string `toml:"name"`
+ ListenAddr string `toml:"listen_addr" split_words:"true"`
+ Token string `toml:"token"`
+}
diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go
new file mode 100644
index 000000000..e11cdbf0a
--- /dev/null
+++ b/internal/praefect/models/repository.go
@@ -0,0 +1,8 @@
+package models
+
+// Repository provides all necessary information to address a repository hosted
+// in a specific Gitaly replica
+type Repository struct {
+ RelativePath string `toml:"relative_path"` // relative path of repository
+ Storage string `toml:"storage"` // storage location, e.g. default
+}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index dce2df103..d3244619d 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -6,24 +6,27 @@ import (
"time"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"github.com/sirupsen/logrus"
)
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, source Repository, target Node) error
+ Replicate(ctx context.Context, source models.Repository, target Node) error
}
type defaultReplicator struct {
log *logrus.Logger
}
-func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, target Node) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
repository := &gitalypb.Repository{
StorageName: target.Storage,
RelativePath: source.RelativePath,
}
+
remoteRepository := &gitalypb.Repository{
StorageName: source.Storage,
RelativePath: source.RelativePath,
@@ -57,9 +60,9 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, ta
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Logger
- jobsStore ReplJobsDatastore
+ dataStore Datastore
coordinator *Coordinator
- storage string // which replica is this replicator responsible for?
+ targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
// whitelist contains the project names of the repos we wish to replicate
@@ -71,13 +74,13 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(storage string, log *logrus.Logger, ds ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
- jobsStore: ds,
+ dataStore: ds,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
- storage: storage,
+ targetNode: targetNode,
coordinator: c,
}
@@ -107,7 +110,7 @@ func WithReplicator(r Replicator) ReplMgrOpt {
// ScheduleReplication will store a replication job in the datastore for later
// execution. It filters out projects that are not whitelisted.
// TODO: add a parameter to delay replication
-func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error {
+func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository) error {
_, ok := r.whitelist[repo.RelativePath]
if !ok {
r.log.WithField(logKeyProjectPath, repo.RelativePath).
@@ -115,14 +118,14 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error
return nil
}
- id, err := r.jobsStore.CreateSecondaryReplJobs(repo)
+ id, err := r.dataStore.CreateSecondaryReplJobs(repo)
if err != nil {
return err
}
r.log.Infof(
- "replication manager for storage %q created replication job with ID %d",
- r.storage,
+ "replication manager for targetNode %q created replication job with ID %d",
+ r.targetNode,
id,
)
@@ -137,13 +140,13 @@ const (
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
for {
- jobs, err := r.jobsStore.GetJobs(JobStatePending|JobStateReady, r.storage, 10)
+ jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10)
if err != nil {
return err
}
if len(jobs) == 0 {
- r.log.Tracef("no jobs for %s, checking again in %s", r.storage, jobFetchInterval)
+ r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval)
select {
// TODO: exponential backoff when no queries are returned
@@ -161,21 +164,33 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
r.log.WithField(logWithReplJobID, job.ID).
Infof("processing replication job %#v", job)
node, err := r.coordinator.GetStorageNode(job.Target)
+ r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err)
if err != nil {
return err
}
- if err := r.jobsStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ return err
+ }
+
+ primary, err := r.dataStore.GetShardPrimary(job.Source)
+ if err != nil {
+ return err
+ }
+
+ ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token)
+ if err != nil {
return err
}
if err := r.replicator.Replicate(ctx, job.Source, node); err != nil {
+ r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
return err
}
r.log.WithField(logWithReplJobID, job.ID).
Info("completed replication")
- if err := r.jobsStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
return err
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index ac2dcf6f6..1294bc989 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -1,7 +1,8 @@
-package praefect_test
+package praefect
import (
"context"
+ "io/ioutil"
"log"
"net"
"os"
@@ -13,11 +14,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
+ gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
+ "gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
"gitlab.com/gitlab-org/gitaly/internal/testhelper"
@@ -28,11 +30,11 @@ import (
func TestReplicatorProcessJobsWhitelist(t *testing.T) {
var (
cfg = config.Config{
- PrimaryServer: &config.GitalyServer{
+ PrimaryServer: &models.GitalyServer{
Name: "default",
ListenAddr: "tcp://gitaly-primary.example.com",
},
- SecondaryServers: []*config.GitalyServer{
+ SecondaryServers: []*models.GitalyServer{
{
Name: "backup1",
ListenAddr: "tcp://gitaly-backup1.example.com",
@@ -42,25 +44,22 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
ListenAddr: "tcp://gitaly-backup2.example.com",
},
},
- Whitelist: []string{
- "abcd1234",
- "edfg5678",
- },
+ Whitelist: []string{"abcd1234", "edfg5678"},
}
- datastore = praefect.NewMemoryDatastore(cfg)
- coordinator = praefect.NewCoordinator(logrus.New(), datastore)
+ datastore = NewMemoryDatastore(cfg)
+ coordinator = NewCoordinator(logrus.New(), datastore)
resultsCh = make(chan result)
- replman = praefect.NewReplMgr(
+ replman = NewReplMgr(
cfg.SecondaryServers[1].Name,
logrus.New(),
datastore,
coordinator,
- praefect.WithWhitelist(cfg.Whitelist),
- praefect.WithReplicator(&mockReplicator{resultsCh}),
+ WithWhitelist(cfg.Whitelist),
+ WithReplicator(&mockReplicator{resultsCh}),
)
)
- for _, node := range []*config.GitalyServer{
+ for _, node := range []*models.GitalyServer{
cfg.PrimaryServer,
cfg.SecondaryServers[0],
cfg.SecondaryServers[1],
@@ -85,8 +84,8 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
result := <-resultsCh
assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
- assert.Equal(t, result.target.Storage, cfg.SecondaryServers[1].Name)
- assert.Equal(t, result.source.Storage, cfg.PrimaryServer.Name)
+ assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage)
+ assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage)
}
cancel()
@@ -107,15 +106,15 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
}
type result struct {
- source praefect.Repository
- target praefect.Node
+ source models.Repository
+ target Node
}
type mockReplicator struct {
resultsCh chan<- result
}
-func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Repository, target praefect.Node) error {
+func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
select {
case mr.resultsCh <- result{source, target}:
@@ -130,33 +129,16 @@ func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Reposit
}
func TestReplicate(t *testing.T) {
+ srv, srvSocketPath := runFullGitalyServer(t)
+ defer srv.Stop()
+
testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t)
defer cleanupFn()
- commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{
- Message: "a commit",
- })
+ backupStorageName := "backup"
- defer cleanupFn()
- var (
- cfg = config.Config{
- PrimaryServer: &config.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*config.GitalyServer{
- {
- Name: "backup",
- ListenAddr: "tcp://gitaly-backup1.example.com",
- },
- },
- Whitelist: []string{
- testRepo.GetRelativePath(),
- },
- }
- )
- backupDir := filepath.Join(testhelper.GitlabTestStoragePath(), "backup")
- require.NoError(t, os.Mkdir(backupDir, os.ModeDir|0755))
+ backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName)
+ require.NoError(t, err)
defer func() {
os.RemoveAll(backupDir)
}()
@@ -167,54 +149,40 @@ func TestReplicate(t *testing.T) {
}()
gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{
- Name: "backup",
+ Name: backupStorageName,
Path: backupDir,
- }, gitaly_config.Storage{
- Name: "default",
- Path: testhelper.GitlabTestStoragePath(),
- })
-
- srv, socketPath := runFullGitalyServer(t)
- defer srv.Stop()
-
- datastore := praefect.NewMemoryDatastore(cfg)
- coordinator := praefect.NewCoordinator(logrus.New(), datastore)
-
- coordinator.RegisterNode("backup", socketPath)
- coordinator.RegisterNode("default", socketPath)
-
- replman := praefect.NewReplMgr(
- cfg.SecondaryServers[0].Name,
- logrus.New(),
- datastore,
- coordinator,
- praefect.WithWhitelist([]string{testRepo.GetRelativePath()}),
+ },
+ gitaly_config.Storage{
+ Name: "default",
+ Path: testhelper.GitlabTestStoragePath(),
+ },
)
ctx, cancel := testhelper.Context()
defer cancel()
- md := testhelper.GitalyServersMetadata(t, socketPath)
- ctx = metadata.NewOutgoingContext(ctx, md)
-
- go func() {
- require.Error(t, context.Canceled, replman.ProcessBacklog(ctx))
- }()
- var tries int
- jobs, err := datastore.GetJobs(praefect.JobStateInProgress|praefect.JobStatePending|praefect.JobStateReady, "backup", 1)
+ connOpts := []grpc.DialOption{
+ grpc.WithInsecure(),
+ grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(testhelper.RepositoryAuthToken)),
+ }
+ conn, err := grpc.Dial(srvSocketPath, connOpts...)
require.NoError(t, err)
- for len(jobs) > 0 {
- if tries > 20 {
- t.Error("exceeded timeout")
- }
- time.Sleep(1 * time.Second)
- tries++
+ commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{
+ Message: "a commit",
+ })
- jobs, err = datastore.GetJobs(praefect.JobStateInProgress|praefect.JobStatePending|praefect.JobStateReady, "backup", 1)
- require.NoError(t, err)
- }
- cancel()
+ ctx, err = helper.InjectGitalyServers(ctx, "default", srvSocketPath, testhelper.RepositoryAuthToken)
+ require.NoError(t, err)
+
+ var replicator defaultReplicator
+ require.NoError(t, replicator.Replicate(
+ ctx,
+ models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()},
+ Node{
+ cc: conn,
+ Storage: backupStorageName,
+ }))
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index b6f2a8fd9..915d7281a 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -14,6 +14,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"google.golang.org/grpc"
)
@@ -51,7 +52,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
)
datastore := praefect.NewMemoryDatastore(config.Config{
- PrimaryServer: &config.GitalyServer{
+ PrimaryServer: &models.GitalyServer{
Name: "default",
},
})