diff options
author | John Cai <jcai@gitlab.com> | 2019-07-19 08:22:14 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-07-19 08:22:14 +0300 |
commit | b399a9c974df029d8e4a519d0fba1ae0b44eb8a9 (patch) | |
tree | ce7682b699582627d18c4c568e295add3346f782 | |
parent | a3047689e849690c7c36b85d6092433df6732422 (diff) |
Handle failover by catching SIGUSR1 signal AND other refactors
25 files changed, 627 insertions, 261 deletions
diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml index f0b3e8f0a..e0f163178 100644 --- a/_support/praefect-cluster/config.praefect.toml +++ b/_support/praefect-cluster/config.praefect.toml @@ -6,13 +6,14 @@ listen_addr = ":2305" # # Praefect can listen on a socket when placed on the same machine as all clients #socket_path = "/etc/gitaly/praefect/socket" # # Praefect will only replicate whitelisted repositories -# whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git"] +whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90ef06fdefb64a3.git"] # # Optional: export metrics via Prometheus # prometheus_listen_addr = "127.0.01:10101" # # You can optionally configure Praefect to output JSON-formatted log messages to stdout -# [logging] +[logging] # format = "json" + level = "info" # # Optional: Set log level to only log entries with that severity or above # # One of, in order: debug, info, warn, errror, fatal, panic # # Defaults to "info" @@ -25,12 +26,15 @@ listen_addr = ":2305" [primary_server] name = "default" - listen_addr = "tcp://gitaly-primary:9999" +# listen_addr = "tcp://gitaly-primary:9999" + listen_addr = "tcp://127.0.0.1:9999" [[secondary_server]] name = "backup1" - listen_addr = "tcp://gitaly-backup-1:9999" +# listen_addr = "tcp://gitaly-backup-1:9999" + listen_addr = "tcp://127.0.0.1:9998" [[secondary_server]] name = "backup2" - listen_addr = "tcp://gitaly-backup-2:9999"
\ No newline at end of file +# listen_addr = "tcp://gitaly-backup-2:9999" + listen_addr = "tcp://127.0.0.1:9997"
\ No newline at end of file diff --git a/_support/praefect-cluster/config.toml b/_support/praefect-cluster/config.toml deleted file mode 100644 index 3c1ff277f..000000000 --- a/_support/praefect-cluster/config.toml +++ /dev/null @@ -1,27 +0,0 @@ -listen_addr = ":9999" -prometheus_listen_addr = ":9236" -bin_dir = "/app/_build/bin" - -# # Git executable settings -[git] -bin_path = "/usr/local/bin/git" - -[[storage]] -name = "default" -path = "/var/opt/gitlab/git-data" - -# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls -[prometheus] -grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] - -[gitaly-ruby] -# The directory where gitaly-ruby is installed -dir = "/app/ruby" - -[gitlab-shell] -# The directory where gitlab-shell is installed -dir = "/app/gitlab-shell" - -[[concurrency]] -rpc = "/gitaly.RepositoryService/GarbageCollect" -max_per_repo = 1 diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml index e115ebd55..6eb81be47 100644 --- a/_support/praefect-cluster/docker-compose.yml +++ b/_support/praefect-cluster/docker-compose.yml @@ -1,37 +1,52 @@ version: "3.5" services: - praefect: - build: - context: ../../ - dockerfile: Dockerfile.praefect - image: praefect:latest - depends_on: - - gitaly-primary - - gitaly-backup-1 - - gitaly-backup-2 - command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"] - ports: - - "2305:2305" - volumes: - - ./config.praefect.toml:/etc/gitaly/config.praefect.toml +# praefect: +# build: +# context: ../../ +# dockerfile: Dockerfile.praefect +# image: praefect:latest +# depends_on: +# - gitaly-primary +# - gitaly-backup-1 +# - gitaly-backup-2 +# command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"] +# ports: +# - "2305:2305" +# volumes: +# - ./config.praefect.toml:/etc/gitaly/config.praefect.toml gitaly-primary: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest + environment: + - GITALY_TESTING_NO_GIT_HOOKS=1 expose: - "9999" + ports: + - "9999:9999" + command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-primary/data:/var/opt/gitlab/git-data - - ./config.toml:/etc/config/config.toml + - ./gitaly-primary/data:/home/git/repositories + - ./gitaly-primary.toml:/etc/config/config.toml gitaly-backup-1: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest + environment: + - GITALY_TESTING_NO_GIT_HOOKS=1 expose: - "9999" + ports: + - "9998:9999" + command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-1/data:/var/opt/gitlab/git-data - - ./config.toml:/etc/config/config.toml + - ./gitaly-backup-1/data:/home/git/repositories + - ./gitaly-backup-1.toml:/etc/config/config.toml gitaly-backup-2: image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest + environment: + - GITALY_TESTING_NO_GIT_HOOKS=1 expose: - "9999" + ports: + - "9997:9999" + command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"] volumes: - - ./gitaly-backup-2/data:/var/opt/gitlab/git-data - - ./config.toml:/etc/config/config.toml
\ No newline at end of file + - ./gitaly-backup-2/data:/home/git/repositories + - ./gitaly-backup-2.toml:/etc/config/config.toml
\ No newline at end of file diff --git a/_support/praefect-cluster/gitaly-backup-1.toml b/_support/praefect-cluster/gitaly-backup-1.toml new file mode 100644 index 000000000..89d1884e3 --- /dev/null +++ b/_support/praefect-cluster/gitaly-backup-1.toml @@ -0,0 +1,49 @@ +# Example Gitaly configuration file + +# The directory where Gitaly's executables are stored +bin_dir = "/usr/local/bin" + +# listen on a TCP socket. This is insecure (no authentication) +listen_addr = "0.0.0.0:9999" + +# # Optional: export metrics via Prometheus +# prometheus_listen_addr = "localhost:9236" +# + +# # Git executable settings +# [git] +# bin_path = "/usr/bin/git" + +[[storage]] +name = "backup1" +path = "/home/git/repositories" + +# # You can optionally configure more storages for this Gitaly instance to serve up +# +# [[storage]] +# name = "other_storage" +# path = "/mnt/other_storage/repositories" +# + +# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout +# [logging] +# format = "json" +# # Additionally exceptions can be reported to Sentry +# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>" + +# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls +# [prometheus] +# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] + +[gitaly-ruby] +# The directory where gitaly-ruby is installed +dir = "/srv/gitaly-ruby" + +[gitlab-shell] +# The directory where gitlab-shell is installed +dir = "/srv/gitlab-shell" + +# # You can adjust the concurrency of each RPC endpoint +# [[concurrency]] +# rpc = "/gitaly.RepositoryService/GarbageCollect" +# max_per_repo = 1 diff --git a/_support/praefect-cluster/gitaly-backup-2.toml b/_support/praefect-cluster/gitaly-backup-2.toml new file mode 100644 index 000000000..1b5ce8d20 --- /dev/null +++ b/_support/praefect-cluster/gitaly-backup-2.toml @@ -0,0 +1,49 @@ +# Example Gitaly configuration file + +# The directory where Gitaly's executables are stored +bin_dir = "/usr/local/bin" + +# listen on a TCP socket. This is insecure (no authentication) +listen_addr = "0.0.0.0:9999" + +# # Optional: export metrics via Prometheus +# prometheus_listen_addr = "localhost:9236" +# + +# # Git executable settings +# [git] +# bin_path = "/usr/bin/git" + +[[storage]] +name = "backup2" +path = "/home/git/repositories" + +# # You can optionally configure more storages for this Gitaly instance to serve up +# +# [[storage]] +# name = "other_storage" +# path = "/mnt/other_storage/repositories" +# + +# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout +# [logging] +# format = "json" +# # Additionally exceptions can be reported to Sentry +# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>" + +# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls +# [prometheus] +# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] + +[gitaly-ruby] +# The directory where gitaly-ruby is installed +dir = "/srv/gitaly-ruby" + +[gitlab-shell] +# The directory where gitlab-shell is installed +dir = "/srv/gitlab-shell" + +# # You can adjust the concurrency of each RPC endpoint +# [[concurrency]] +# rpc = "/gitaly.RepositoryService/GarbageCollect" +# max_per_repo = 1 diff --git a/_support/praefect-cluster/gitaly-primary.toml b/_support/praefect-cluster/gitaly-primary.toml new file mode 100644 index 000000000..2379b6951 --- /dev/null +++ b/_support/praefect-cluster/gitaly-primary.toml @@ -0,0 +1,49 @@ +# Example Gitaly configuration file + +# The directory where Gitaly's executables are stored +bin_dir = "/usr/local/bin" + +# listen on a TCP socket. This is insecure (no authentication) +listen_addr = "0.0.0.0:9999" + +# # Optional: export metrics via Prometheus +# prometheus_listen_addr = "localhost:9236" +# + +# # Git executable settings +# [git] +# bin_path = "/usr/bin/git" + +[[storage]] +name = "default" +path = "/home/git/repositories" + +# # You can optionally configure more storages for this Gitaly instance to serve up +# +# [[storage]] +# name = "other_storage" +# path = "/mnt/other_storage/repositories" +# + +# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout +# [logging] +# format = "json" +# # Additionally exceptions can be reported to Sentry +# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>" + +# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls +# [prometheus] +# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0] + +[gitaly-ruby] +# The directory where gitaly-ruby is installed +dir = "/srv/gitaly-ruby" + +[gitlab-shell] +# The directory where gitlab-shell is installed +dir = "/srv/gitlab-shell" + +# # You can adjust the concurrency of each RPC endpoint +# [[concurrency]] +# rpc = "/gitaly.RepositoryService/GarbageCollect" +# max_per_repo = 1 diff --git a/changelogs/unreleased/jc-naive-failover.yml b/changelogs/unreleased/jc-naive-failover.yml new file mode 100644 index 000000000..752e770a0 --- /dev/null +++ b/changelogs/unreleased/jc-naive-failover.yml @@ -0,0 +1,5 @@ +--- +title: Handle failover by catching SIGUSR1 signal +merge_request: 1346 +author: +type: other diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 37fdddb22..acf53d2fa 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/version" @@ -113,8 +114,6 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go func() { serverErrors <- repl.ProcessBacklog(ctx) }() - allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer) for _, gitaly := range allBackendServers { @@ -122,9 +121,13 @@ func run(listeners []net.Listener, conf config.Config) error { return fmt.Errorf("failed to register %s: %s", gitaly.Name, err) } - logger.WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node") + logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node") } + go func() { serverErrors <- repl.ProcessBacklog(ctx) }() + + go coordinator.FailoverRotation() + select { case s := <-termCh: logger.WithField("signal", s).Warn("received signal, shutting down gracefully") diff --git a/config.praefect.toml.example b/config.praefect.toml.example index 59e7563f1..1393de890 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -26,11 +26,14 @@ listen_addr = "127.0.0.1:2305" [primary_server] name = "default" listen_addr = "tcp://gitaly-primary.example.com" + token = "abcd1234" # [[secondary_server]] -# name = "default" +# name = "backup-1" # listen_addr = "tcp://gitaly-backup1.example.com" +# token = "abcd1234" # [[secondary_server]] -# name = "backup" +# name = "backup-2" # listen_addr = "tcp://gitaly-backup2.example.com" +# token = "abcd1234" @@ -9,6 +9,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/kelseyhightower/envconfig v1.3.0 + github.com/kr/pretty v0.1.0 // indirect github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47 github.com/prometheus/client_golang v1.0.0 github.com/sirupsen/logrus v1.2.0 @@ -16,10 +17,12 @@ require ( github.com/tinylib/msgp v1.1.0 // indirect gitlab.com/gitlab-org/gitaly-proto v1.37.0 gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c + golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 // indirect golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 - golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a + golang.org/x/sys v0.0.0-20190422165155-953cdadca894 google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect google.golang.org/grpc v1.16.0 + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect ) @@ -51,6 +51,11 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47 h1:HDt7WT3kpXSHq4mlOuLzgXH9LeOK1qlhyFdKIAzxxeM= github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47/go.mod h1:4bKN42efkbNYMZlvDfxGDxzl066GhpvIircZDsm8Y+Y= github.com/lightstep/lightstep-tracer-go v0.15.6 h1:D0GGa7afJ7GcQvu5as6ssLEEKYXvRgKI5d5cevtz8r4= @@ -112,11 +117,14 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc= +golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= @@ -132,6 +140,9 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -147,6 +158,8 @@ gopkg.in/DataDog/dd-trace-go.v1 v1.7.0/go.mod h1:DVp8HmDh8PuTu2Z0fVVlBsyWaC++fzw gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= diff --git a/internal/helper/storage.go b/internal/helper/storage.go index 83341b0c4..4e535a5d6 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -33,3 +33,21 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly return } + +// InjectGitalyServers injects gitaly-servers metadata into an outgoing context +func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) { + + gitalyServers := storage.GitalyServers{ + name: { + "address": address, + "token": token, + }, + } + + gitalyServersJSON, err := json.Marshal(gitalyServers) + if err != nil { + return nil, err + } + + return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))), nil +} diff --git a/internal/praefect/common.go b/internal/praefect/common.go index a63d309bd..2df2a4823 100644 --- a/internal/praefect/common.go +++ b/internal/praefect/common.go @@ -2,13 +2,6 @@ package praefect import "google.golang.org/grpc" -// Repository provides all necessary information to address a repository hosted -// in a specific Gitaly replica -type Repository struct { - RelativePath string // relative path of repository - Storage string // storage location, e.g. default -} - // Node is a wrapper around the grpc client connection for a backend Gitaly node type Node struct { Storage string diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 768104ed1..6a2a5b5d5 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -5,7 +5,9 @@ import ( "os" "github.com/BurntSushi/toml" + "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) // Config is a container for everything found in the TOML config file @@ -13,8 +15,8 @@ type Config struct { ListenAddr string `toml:"listen_addr"` SocketPath string `toml:"socket_path"` - PrimaryServer *GitalyServer `toml:"primary_server"` - SecondaryServers []*GitalyServer `toml:"secondary_server"` + PrimaryServer *models.GitalyServer `toml:"primary_server"` + SecondaryServers []*models.GitalyServer `toml:"secondary_server"` // Whitelist is a list of relative project paths (paths comprised of project // hashes) that are permitted to use high availability features @@ -28,6 +30,7 @@ type Config struct { type GitalyServer struct { Name string `toml:"name"` ListenAddr string `toml:"listen_addr" split_words:"true"` + Token string `toml:"token"` } // FromFile loads the config for the passed file path @@ -50,7 +53,7 @@ var ( errGitalyWithoutName = errors.New("all gitaly servers must have a name") ) -var emptyServer = &GitalyServer{} +var emptyServer = &models.GitalyServer{} // Validate establishes if the config is valid func (c Config) Validate() error { diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 33731b17d..eace5eb2f 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -5,13 +5,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) func TestConfigValidation(t *testing.T) { - primarySrv := &GitalyServer{"test", "localhost:23456"} - secondarySrvs := []*GitalyServer{ - {"test1", "localhost:23457"}, - {"test2", "localhost:23458"}, + primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"} + secondarySrvs := []*models.GitalyServer{ + {"test1", "localhost:23457", "secret-token"}, + {"test2", "localhost:23458", "secret-token"}, } testCases := []struct { @@ -36,7 +37,7 @@ func TestConfigValidation(t *testing.T) { }, { desc: "duplicate address", - config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*GitalyServer{primarySrv}}, + config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}}, err: errDuplicateGitalyAddr, }, { @@ -62,11 +63,11 @@ func TestConfigParsing(t *testing.T) { { filePath: "testdata/config.toml", expected: Config{ - PrimaryServer: &GitalyServer{ + PrimaryServer: &models.GitalyServer{ Name: "default", ListenAddr: "tcp://gitaly-primary.example.com", }, - SecondaryServers: []*GitalyServer{ + SecondaryServers: []*models.GitalyServer{ { Name: "default", ListenAddr: "tcp://gitaly-backup1.example.com", @@ -76,10 +77,7 @@ func TestConfigParsing(t *testing.T) { ListenAddr: "tcp://gitaly-backup2.example.com", }, }, - Whitelist: []string{ - "abcd1234", - "edfg5678", - }, + Whitelist: []string{"abcd1234", "edfg5678"}, }, }, } diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b2e6704d5..8f64022cb 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -3,10 +3,15 @@ package praefect import ( "context" "fmt" + "os" + "os/signal" "sync" + "syscall" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "github.com/golang/protobuf/protoc-gen-go/descriptor" @@ -22,17 +27,18 @@ import ( // downstream server. The coordinator is thread safe; concurrent calls to // register nodes are safe. type Coordinator struct { - log *logrus.Logger - lock sync.RWMutex + log *logrus.Logger + failoverMutex sync.RWMutex + connMutex sync.RWMutex - datastore PrimaryDatastore + datastore Datastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -63,12 +69,15 @@ func (c *Coordinator) GetStorageNode(storage string) (Node, error) { } // streamDirector determines which downstream servers receive requests -func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { +func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamPeeker) (context.Context, *grpc.ClientConn, error) { // For phase 1, we need to route messages based on the storage location // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) - storageName, err := c.datastore.GetPrimary() + c.failoverMutex.RLock() + defer c.failoverMutex.RUnlock() + + serverConfig, err := c.datastore.GetDefaultPrimary() if err != nil { err := status.Error( codes.FailedPrecondition, @@ -79,9 +88,14 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, ok := c.getConn(storageName) + cc, ok := c.getConn(serverConfig.Name) if !ok { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName) + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name) + } + + ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token) + if err != nil { + return nil, nil, err } return ctx, cc, nil @@ -106,15 +120,81 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { } func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { - c.lock.Lock() + c.connMutex.Lock() c.nodes[storageName] = conn - c.lock.Unlock() + c.connMutex.Unlock() } func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) { - c.lock.RLock() + c.connMutex.RLock() cc, ok := c.nodes[storageName] - c.lock.RUnlock() + c.connMutex.RUnlock() return cc, ok } + +// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary +func (c *Coordinator) FailoverRotation() { + c.handleSignalAndRotate() +} + +func (c *Coordinator) handleSignalAndRotate() { + failoverChan := make(chan os.Signal, 1) + signal.Notify(failoverChan, syscall.SIGUSR1) + + for { + <-failoverChan + + c.failoverMutex.Lock() + primary, err := c.datastore.GetDefaultPrimary() + if err != nil { + c.log.Fatalf("error when getting default primary: %v", err) + } + + if err := c.rotateSecondaryToPrimary(primary); err != nil { + c.log.WithError(err).Error("rotating secondary") + } + c.failoverMutex.Unlock() + } +} + +func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error { + repositories, err := c.datastore.GetRepositoriesForPrimary(primary) + if err != nil { + return err + } + + for _, repoPath := range repositories { + secondaries, err := c.datastore.GetShardSecondaries(models.Repository{ + RelativePath: repoPath, + }) + if err != nil { + return fmt.Errorf("getting secondaries: %v", err) + } + + newPrimary := secondaries[0] + secondaries = append(secondaries[1:], primary) + + if err = c.datastore.SetShardPrimary(models.Repository{ + RelativePath: repoPath, + }, newPrimary); err != nil { + return fmt.Errorf("setting primary: %v", err) + } + + if err = c.datastore.SetShardSecondaries(models.Repository{ + RelativePath: repoPath, + }, secondaries); err != nil { + return fmt.Errorf("setting secondaries: %v", err) + } + } + + // set the new default primary + primary, err = c.datastore.GetShardPrimary(models.Repository{ + RelativePath: repositories[0], + }) + if err != nil { + return fmt.Errorf("getting shard primary: %v", err) + } + + return c.datastore.SetDefaultPrimary(primary) +} diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go new file mode 100644 index 000000000..50045f8a0 --- /dev/null +++ b/internal/praefect/coordinator_test.go @@ -0,0 +1,47 @@ +package praefect + +import ( + "io/ioutil" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" +) + +var testLogger = logrus.New() + +func init() { + testLogger.SetOutput(ioutil.Discard) +} + +func TestSecondaryRotation(t *testing.T) { + cfg := config.Config{ + PrimaryServer: &models.GitalyServer{Name: "primary"}, + SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}}, + Whitelist: []string{"/repoA", "/repoB"}, + } + d := NewMemoryDatastore(cfg) + c := NewCoordinator(testLogger, d) + + primary, err := d.GetDefaultPrimary() + require.NoError(t, err) + + require.NoError(t, c.rotateSecondaryToPrimary(primary)) + + primary, err = d.GetDefaultPrimary() + require.NoError(t, err) + require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary") + + repositories, err := d.GetRepositoriesForPrimary(primary) + require.NoError(t, err) + + for _, repository := range repositories { + shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository}) + require.NoError(t, err) + + require.Len(t, shardSecondaries, 2) + require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0]) + } +} diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index eeb9f9728..5678c6a24 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -12,6 +12,7 @@ import ( "sync" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) var ( @@ -41,9 +42,9 @@ const ( // meant for updating the repository so that it is synced with the primary // copy. Scheduled indicates when a replication job should be performed. type ReplJob struct { - ID uint64 // autoincrement ID - Target string // which storage location to replicate to? - Source Repository // source for replication + ID uint64 // autoincrement ID + Target string // which storage location to replicate to? + Source models.Repository // source for replication State JobState } @@ -63,15 +64,13 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I type Datastore interface { ReplJobsDatastore ReplicasDatastore - PrimaryDatastore + TemporaryDatastore } -// PrimaryDatastore manages accessing and setting the primary storage location -type PrimaryDatastore interface { - // GetPrimary gets the primary storage location - GetPrimary() (string, error) - // SetPrimary sets the primary storage location - SetPrimary(primary string) error +// TemporaryDatastore contains methods that will go away once we move to a SQL datastore +type TemporaryDatastore interface { + GetDefaultPrimary() (models.GitalyServer, error) + SetDefaultPrimary(primary models.GitalyServer) error } // ReplicasDatastore manages accessing and setting which secondary replicas @@ -79,11 +78,18 @@ type PrimaryDatastore interface { type ReplicasDatastore interface { // GetSecondaries will retrieve all secondary replica storage locations for // a primary replica - GetSecondaries(primary Repository) ([]string, error) + GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error) + + GetShardPrimary(repo models.Repository) (models.GitalyServer, error) // SetSecondaries will set the secondary storage locations for a repository // in a primary replica. - SetSecondaries(primary Repository, secondaries []string) error + SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error + + SetShardPrimary(repo models.Repository, primary models.GitalyServer) error + + // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary + GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -92,12 +98,12 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, storage string, count int) ([]ReplJob, error) + GetJobs(flag JobState, node string, count int) ([]ReplJob, error) // CreateSecondaryJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateSecondaryReplJobs(source Repository) ([]uint64, error) + CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error @@ -105,13 +111,13 @@ type ReplJobsDatastore interface { // shard is a set of primary and secondary storage replicas for a project type shard struct { - primary string - secondaries []string + primary models.GitalyServer + secondaries []models.GitalyServer } type jobRecord struct { relativePath string // project's relative path - target string + targetNode string state JobState } @@ -132,7 +138,7 @@ type MemoryDatastore struct { primary *struct { sync.RWMutex - storageName string + server models.GitalyServer } } @@ -155,22 +161,26 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { }, primary: &struct { sync.RWMutex - storageName string + server models.GitalyServer }{ - storageName: cfg.PrimaryServer.Name, + server: models.GitalyServer{ + Name: cfg.PrimaryServer.Name, + ListenAddr: cfg.PrimaryServer.ListenAddr, + Token: cfg.PrimaryServer.Token, + }, }, } - secondaries := make([]string, len(cfg.SecondaryServers)) + secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers)) for i, server := range cfg.SecondaryServers { - secondaries[i] = server.Name + secondaryServers[i] = *server } - for _, relativePath := range cfg.Whitelist { + for _, repo := range cfg.Whitelist { // store the configuration file specified shard - m.replicas.m[relativePath] = shard{ - primary: cfg.PrimaryServer.Name, - secondaries: secondaries, + m.replicas.m[repo] = shard{ + primary: *cfg.PrimaryServer, + secondaries: secondaryServers, } // initialize replication job queue to replicate all whitelisted repos @@ -179,36 +189,70 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { m.jobs.next++ m.jobs.records[m.jobs.next] = jobRecord{ state: JobStateReady, - target: secondary.Name, - relativePath: relativePath, + targetNode: secondary.Name, + relativePath: repo, } } - } return m } -// GetSecondaries will return the set of secondary storage locations for a +// GetShardSecondaries will return the set of secondary storage locations for a // given repository if they exist -func (md *MemoryDatastore) GetSecondaries(primary Repository) ([]string, error) { +func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) { shard, _ := md.getShard(primary.RelativePath) return shard.secondaries, nil } -// SetSecondaries will replace the set of replicas for a repository -func (md *MemoryDatastore) SetSecondaries(primary Repository, secondaries []string) error { +// SetShardSecondaries will replace the set of replicas for a repository +func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error { md.replicas.Lock() - md.replicas.m[primary.RelativePath] = shard{ - primary: primary.Storage, - secondaries: secondaries, - } - md.replicas.Unlock() + defer md.replicas.Unlock() + + shard := md.replicas.m[repo.RelativePath] + shard.secondaries = secondaries + md.replicas.m[repo.RelativePath] = shard return nil } +// SetShardPrimary sets the primary for a repository +func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error { + md.replicas.Lock() + defer md.replicas.Unlock() + + shard := md.replicas.m[repo.RelativePath] + shard.primary = primary + md.replicas.m[repo.RelativePath] = shard + + return nil +} + +// GetShardPrimary gets the primary for a repository +func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) { + md.replicas.Lock() + defer md.replicas.Unlock() + + shard := md.replicas.m[repo.RelativePath] + return shard.primary, nil +} + +// GetRepositoriesForPrimary gets all repositories +func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) { + md.replicas.Lock() + defer md.replicas.Unlock() + + repositories := make([]string, 0, len(md.replicas.m)) + + for repository := range md.replicas.m { + repositories = append(repositories, repository) + } + + return repositories, nil +} + func (md *MemoryDatastore) getShard(project string) (shard, bool) { md.replicas.RLock() replicas, ok := md.replicas.m[project] @@ -230,7 +274,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.target == storage { + if record.state&state != 0 && record.targetNode == storage { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -261,26 +305,26 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re return ReplJob{ ID: jobID, - Source: Repository{ + Source: models.Repository{ RelativePath: record.relativePath, - Storage: shard.primary, + Storage: shard.primary.Name, }, State: record.state, - Target: record.target, + Target: record.targetNode, }, nil } -// ErrInvalidReplTarget indicates a target repository cannot be chosen because +// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because // it fails preconditions for being replicatable -var ErrInvalidReplTarget = errors.New("target repository fails preconditions for replication") +var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication") // CreateSecondaryReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64, error) { +func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() - emptyRepo := Repository{} + emptyRepo := models.Repository{} if source == emptyRepo { return nil, errors.New("invalid source repository") } @@ -300,7 +344,7 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(source Repository) ([]uint64, md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - target: secondary, + targetNode: secondary.Name, state: JobStatePending, relativePath: source.RelativePath, } @@ -333,24 +377,34 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error } // SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary string) error { +func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error { md.primary.Lock() defer md.primary.Unlock() - md.primary.storageName = primary + md.primary.server = primary return nil } -// GetPrimary gets the primary datastore location -func (md *MemoryDatastore) GetPrimary() (string, error) { +// GetDefaultPrimary gets the primary datastore location +func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) { md.primary.RLock() defer md.primary.RUnlock() - storageName := md.primary.storageName - if storageName == "" { - return "", ErrPrimaryNotSet + primary := md.primary.server + if primary == (models.GitalyServer{}) { + return primary, ErrPrimaryNotSet } - return storageName, nil + return primary, nil +} + +// SetDefaultPrimary gets the primary datastore location +func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error { + md.primary.RLock() + defer md.primary.RUnlock() + + md.primary.server = primary + + return nil } diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index a306b3ce6..6099a8328 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) // TestMemoryDatastoreWhitelist verifies that the in-memory datastore will @@ -13,10 +14,10 @@ import ( // with a configuration file specifying the shard and whitelisted repositories. func TestMemoryDatastoreWhitelist(t *testing.T) { cfg := config.Config{ - PrimaryServer: &config.GitalyServer{ + PrimaryServer: &models.GitalyServer{ Name: "default", }, - SecondaryServers: []*config.GitalyServer{ + SecondaryServers: []*models.GitalyServer{ { Name: "backup-1", }, @@ -24,32 +25,30 @@ func TestMemoryDatastoreWhitelist(t *testing.T) { Name: "backup-2", }, }, - Whitelist: []string{ - "abcd1234", - "5678efgh", - }, + Whitelist: []string{"abcd1234", "5678efgh"}, } mds := praefect.NewMemoryDatastore(cfg) - repo1 := praefect.Repository{ + repo1 := models.Repository{ RelativePath: cfg.Whitelist[0], Storage: cfg.PrimaryServer.Name, } - repo2 := praefect.Repository{ + + repo2 := models.Repository{ RelativePath: cfg.Whitelist[1], Storage: cfg.PrimaryServer.Name, } - expectSecondaries := []string{ - cfg.SecondaryServers[0].Name, - cfg.SecondaryServers[1].Name, + expectSecondaries := []models.GitalyServer{ + models.GitalyServer{Name: cfg.SecondaryServers[0].Name}, + models.GitalyServer{Name: cfg.SecondaryServers[1].Name}, } - for _, repo := range []praefect.Repository{repo1, repo2} { - actualSecondaries, err := mds.GetSecondaries(repo) + for _, repo := range []models.Repository{repo1, repo2} { + actualSecondaries, err := mds.GetShardSecondaries(repo) require.NoError(t, err) - require.ElementsMatch(t, actualSecondaries, expectSecondaries) + require.ElementsMatch(t, expectSecondaries, actualSecondaries) } backup1 := cfg.SecondaryServers[0] diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 6534b9d88..417a04be2 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) const ( @@ -15,7 +16,7 @@ const ( ) var ( - repo1Primary = praefect.Repository{ + repo1Primary = models.Repository{ RelativePath: proj1, Storage: stor1, } @@ -41,9 +42,16 @@ var operations = []struct { }, }, { + desc: "set the primary for the shard", + opFn: func(t *testing.T, ds praefect.Datastore) { + err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1}) + require.NoError(t, err) + }, + }, + { desc: "associate the replication job target with a primary", opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetSecondaries(repo1Primary, []string{stor2}) + err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}}) require.NoError(t, err) }, }, @@ -93,7 +101,7 @@ var flavors = map[string]func() praefect.Datastore{ "in-memory-datastore": func() praefect.Datastore { return praefect.NewMemoryDatastore( config.Config{ - PrimaryServer: &config.GitalyServer{ + PrimaryServer: &models.GitalyServer{ Name: "default", }, }) diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go new file mode 100644 index 000000000..854254d87 --- /dev/null +++ b/internal/praefect/models/nodes.go @@ -0,0 +1,8 @@ +package models + +// GitalyServer allows configuring the servers that RPCs are proxied to +type GitalyServer struct { + Name string `toml:"name"` + ListenAddr string `toml:"listen_addr" split_words:"true"` + Token string `toml:"token"` +} diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go new file mode 100644 index 000000000..e11cdbf0a --- /dev/null +++ b/internal/praefect/models/repository.go @@ -0,0 +1,8 @@ +package models + +// Repository provides all necessary information to address a repository hosted +// in a specific Gitaly replica +type Repository struct { + RelativePath string `toml:"relative_path"` // relative path of repository + Storage string `toml:"storage"` // storage location, e.g. default +} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index dce2df103..d3244619d 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -6,24 +6,27 @@ import ( "time" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "github.com/sirupsen/logrus" ) // Replicator performs the actual replication logic between two nodes type Replicator interface { - Replicate(ctx context.Context, source Repository, target Node) error + Replicate(ctx context.Context, source models.Repository, target Node) error } type defaultReplicator struct { log *logrus.Logger } -func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, target Node) error { +func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { repository := &gitalypb.Repository{ StorageName: target.Storage, RelativePath: source.RelativePath, } + remoteRepository := &gitalypb.Repository{ StorageName: source.Storage, RelativePath: source.RelativePath, @@ -57,9 +60,9 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, ta // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Logger - jobsStore ReplJobsDatastore + dataStore Datastore coordinator *Coordinator - storage string // which replica is this replicator responsible for? + targetNode string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic // whitelist contains the project names of the repos we wish to replicate @@ -71,13 +74,13 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(storage string, log *logrus.Logger, ds ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, - jobsStore: ds, + dataStore: ds, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, - storage: storage, + targetNode: targetNode, coordinator: c, } @@ -107,7 +110,7 @@ func WithReplicator(r Replicator) ReplMgrOpt { // ScheduleReplication will store a replication job in the datastore for later // execution. It filters out projects that are not whitelisted. // TODO: add a parameter to delay replication -func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error { +func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository) error { _, ok := r.whitelist[repo.RelativePath] if !ok { r.log.WithField(logKeyProjectPath, repo.RelativePath). @@ -115,14 +118,14 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error return nil } - id, err := r.jobsStore.CreateSecondaryReplJobs(repo) + id, err := r.dataStore.CreateSecondaryReplJobs(repo) if err != nil { return err } r.log.Infof( - "replication manager for storage %q created replication job with ID %d", - r.storage, + "replication manager for targetNode %q created replication job with ID %d", + r.targetNode, id, ) @@ -137,13 +140,13 @@ const ( // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { for { - jobs, err := r.jobsStore.GetJobs(JobStatePending|JobStateReady, r.storage, 10) + jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10) if err != nil { return err } if len(jobs) == 0 { - r.log.Tracef("no jobs for %s, checking again in %s", r.storage, jobFetchInterval) + r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval) select { // TODO: exponential backoff when no queries are returned @@ -161,21 +164,33 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { r.log.WithField(logWithReplJobID, job.ID). Infof("processing replication job %#v", job) node, err := r.coordinator.GetStorageNode(job.Target) + r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err) if err != nil { return err } - if err := r.jobsStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + return err + } + + primary, err := r.dataStore.GetShardPrimary(job.Source) + if err != nil { + return err + } + + ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token) + if err != nil { return err } if err := r.replicator.Replicate(ctx, job.Source, node); err != nil { + r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") return err } r.log.WithField(logWithReplJobID, job.ID). Info("completed replication") - if err := r.jobsStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { + if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { return err } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index ac2dcf6f6..1294bc989 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -1,7 +1,8 @@ -package praefect_test +package praefect import ( "context" + "io/ioutil" "log" "net" "os" @@ -13,11 +14,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" + gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" "gitlab.com/gitlab-org/gitaly/internal/testhelper" @@ -28,11 +30,11 @@ import ( func TestReplicatorProcessJobsWhitelist(t *testing.T) { var ( cfg = config.Config{ - PrimaryServer: &config.GitalyServer{ + PrimaryServer: &models.GitalyServer{ Name: "default", ListenAddr: "tcp://gitaly-primary.example.com", }, - SecondaryServers: []*config.GitalyServer{ + SecondaryServers: []*models.GitalyServer{ { Name: "backup1", ListenAddr: "tcp://gitaly-backup1.example.com", @@ -42,25 +44,22 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { ListenAddr: "tcp://gitaly-backup2.example.com", }, }, - Whitelist: []string{ - "abcd1234", - "edfg5678", - }, + Whitelist: []string{"abcd1234", "edfg5678"}, } - datastore = praefect.NewMemoryDatastore(cfg) - coordinator = praefect.NewCoordinator(logrus.New(), datastore) + datastore = NewMemoryDatastore(cfg) + coordinator = NewCoordinator(logrus.New(), datastore) resultsCh = make(chan result) - replman = praefect.NewReplMgr( + replman = NewReplMgr( cfg.SecondaryServers[1].Name, logrus.New(), datastore, coordinator, - praefect.WithWhitelist(cfg.Whitelist), - praefect.WithReplicator(&mockReplicator{resultsCh}), + WithWhitelist(cfg.Whitelist), + WithReplicator(&mockReplicator{resultsCh}), ) ) - for _, node := range []*config.GitalyServer{ + for _, node := range []*models.GitalyServer{ cfg.PrimaryServer, cfg.SecondaryServers[0], cfg.SecondaryServers[1], @@ -85,8 +84,8 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { result := <-resultsCh assert.Contains(t, cfg.Whitelist, result.source.RelativePath) - assert.Equal(t, result.target.Storage, cfg.SecondaryServers[1].Name) - assert.Equal(t, result.source.Storage, cfg.PrimaryServer.Name) + assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage) + assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage) } cancel() @@ -107,15 +106,15 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { } type result struct { - source praefect.Repository - target praefect.Node + source models.Repository + target Node } type mockReplicator struct { resultsCh chan<- result } -func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Repository, target praefect.Node) error { +func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { select { case mr.resultsCh <- result{source, target}: @@ -130,33 +129,16 @@ func (mr *mockReplicator) Replicate(ctx context.Context, source praefect.Reposit } func TestReplicate(t *testing.T) { + srv, srvSocketPath := runFullGitalyServer(t) + defer srv.Stop() + testRepo, testRepoPath, cleanupFn := testhelper.NewTestRepo(t) defer cleanupFn() - commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{ - Message: "a commit", - }) + backupStorageName := "backup" - defer cleanupFn() - var ( - cfg = config.Config{ - PrimaryServer: &config.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*config.GitalyServer{ - { - Name: "backup", - ListenAddr: "tcp://gitaly-backup1.example.com", - }, - }, - Whitelist: []string{ - testRepo.GetRelativePath(), - }, - } - ) - backupDir := filepath.Join(testhelper.GitlabTestStoragePath(), "backup") - require.NoError(t, os.Mkdir(backupDir, os.ModeDir|0755)) + backupDir, err := ioutil.TempDir(testhelper.GitlabTestStoragePath(), backupStorageName) + require.NoError(t, err) defer func() { os.RemoveAll(backupDir) }() @@ -167,54 +149,40 @@ func TestReplicate(t *testing.T) { }() gitaly_config.Config.Storages = append(gitaly_config.Config.Storages, gitaly_config.Storage{ - Name: "backup", + Name: backupStorageName, Path: backupDir, - }, gitaly_config.Storage{ - Name: "default", - Path: testhelper.GitlabTestStoragePath(), - }) - - srv, socketPath := runFullGitalyServer(t) - defer srv.Stop() - - datastore := praefect.NewMemoryDatastore(cfg) - coordinator := praefect.NewCoordinator(logrus.New(), datastore) - - coordinator.RegisterNode("backup", socketPath) - coordinator.RegisterNode("default", socketPath) - - replman := praefect.NewReplMgr( - cfg.SecondaryServers[0].Name, - logrus.New(), - datastore, - coordinator, - praefect.WithWhitelist([]string{testRepo.GetRelativePath()}), + }, + gitaly_config.Storage{ + Name: "default", + Path: testhelper.GitlabTestStoragePath(), + }, ) ctx, cancel := testhelper.Context() defer cancel() - md := testhelper.GitalyServersMetadata(t, socketPath) - ctx = metadata.NewOutgoingContext(ctx, md) - - go func() { - require.Error(t, context.Canceled, replman.ProcessBacklog(ctx)) - }() - var tries int - jobs, err := datastore.GetJobs(praefect.JobStateInProgress|praefect.JobStatePending|praefect.JobStateReady, "backup", 1) + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(testhelper.RepositoryAuthToken)), + } + conn, err := grpc.Dial(srvSocketPath, connOpts...) require.NoError(t, err) - for len(jobs) > 0 { - if tries > 20 { - t.Error("exceeded timeout") - } - time.Sleep(1 * time.Second) - tries++ + commitID := testhelper.CreateCommit(t, testRepoPath, "master", &testhelper.CreateCommitOpts{ + Message: "a commit", + }) - jobs, err = datastore.GetJobs(praefect.JobStateInProgress|praefect.JobStatePending|praefect.JobStateReady, "backup", 1) - require.NoError(t, err) - } - cancel() + ctx, err = helper.InjectGitalyServers(ctx, "default", srvSocketPath, testhelper.RepositoryAuthToken) + require.NoError(t, err) + + var replicator defaultReplicator + require.NoError(t, replicator.Replicate( + ctx, + models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()}, + Node{ + cc: conn, + Storage: backupStorageName, + })) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index b6f2a8fd9..915d7281a 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -14,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "google.golang.org/grpc" ) @@ -51,7 +52,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) { ) datastore := praefect.NewMemoryDatastore(config.Config{ - PrimaryServer: &config.GitalyServer{ + PrimaryServer: &models.GitalyServer{ Name: "default", }, }) |