Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-07-18 01:35:11 +0300
committerJohn Cai <jcai@gitlab.com>2019-07-24 23:09:50 +0300
commit27c90b5a8306bf18b188fb23674a78a8a5544105 (patch)
treec44b3eafc62c3c016126ccffd8413bfc890b76b3
parentff2a5c3bbd4cfa244c355c369de2c4c092e5e9ad (diff)
SQL datastore for praefect
-rw-r--r--.gitignore1
-rw-r--r--NOTICE63
-rw-r--r--_support/praefect-cluster/.gitignore6
-rw-r--r--_support/praefect-cluster/config.gitaly.toml (renamed from _support/praefect-cluster/gitaly-primary.toml)0
-rw-r--r--_support/praefect-cluster/config.praefect.toml15
-rw-r--r--_support/praefect-cluster/docker-compose.yml24
-rw-r--r--_support/praefect-cluster/gitaly-backup-1.toml49
-rw-r--r--_support/praefect-cluster/gitaly-backup-2.toml49
-rw-r--r--changelogs/unreleased/jc-sql-data-store.yml5
-rw-r--r--cmd/praefect/main.go38
-rw-r--r--go.mod14
-rw-r--r--go.sum28
-rw-r--r--internal/helper/storage.go16
-rw-r--r--internal/praefect/common.go8
-rw-r--r--internal/praefect/config/config.go28
-rw-r--r--internal/praefect/config/config_test.go34
-rw-r--r--internal/praefect/config/testdata/config.toml25
-rw-r--r--internal/praefect/coordinator.go168
-rw-r--r--internal/praefect/coordinator_test.go31
-rw-r--r--internal/praefect/database/migrations/1_initial_up.sql19
-rw-r--r--internal/praefect/database/sql_datastore.go225
-rw-r--r--internal/praefect/datastore.go319
-rw-r--r--internal/praefect/datastore_memory_test.go122
-rw-r--r--internal/praefect/datastore_test.go92
-rw-r--r--internal/praefect/mock/mock.pb.go30
-rw-r--r--internal/praefect/mock/mock.proto7
-rw-r--r--internal/praefect/mocksvc_test.go4
-rw-r--r--internal/praefect/models/node.go18
-rw-r--r--internal/praefect/models/nodes.go8
-rw-r--r--internal/praefect/protoregistry/protoregistry.go37
-rw-r--r--internal/praefect/protoregistry/targetrepo_test.go2
-rw-r--r--internal/praefect/replicator.go108
-rw-r--r--internal/praefect/replicator_test.go124
-rw-r--r--internal/praefect/server_test.go52
34 files changed, 991 insertions, 778 deletions
diff --git a/.gitignore b/.gitignore
index b7fbb0b4c..9fb2c7e44 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,6 +21,7 @@ cmd/gitaly-remote/gitaly-remote
git-env
/gitaly-debug
/praefect
+/praefect-migrate
gitaly.pid
/vendor/github.com/libgit2/git2go/vendor
/vendor
diff --git a/NOTICE b/NOTICE
index 5a34cfece..dffa2f1bf 100644
--- a/NOTICE
+++ b/NOTICE
@@ -904,6 +904,16 @@ The above copyright notice and this permission notice shall be included in all c
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+LICENSE.md - gitlab.com/gitlab-org/gitaly/vendor/github.com/lib/pq
+Copyright (c) 2011-2013, 'pq' Contributors
+Portions Copyright (C) 2011 Blake Mizerany
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - gitlab.com/gitlab-org/gitaly/vendor/github.com/libgit2/git2go
The MIT License
@@ -2877,59 +2887,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/crypto
-Copyright (c) 2009 The Go Authors. All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
- * Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
- * Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
- * Neither the name of Google Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-PATENTS - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/crypto
-Additional IP Rights Grant (Patents)
-
-"This implementation" means the copyrightable works distributed by
-Google as part of the Go project.
-
-Google hereby grants to You a perpetual, worldwide, non-exclusive,
-no-charge, royalty-free, irrevocable (except as stated in this section)
-patent license to make, have made, use, offer to sell, sell, import,
-transfer and otherwise run, modify and propagate the contents of this
-implementation of Go, where such license applies only to those patent
-claims, both currently owned or controlled by Google and acquired in
-the future, licensable by Google that are necessarily infringed by this
-implementation of Go. This grant does not include claims that would be
-infringed only as a consequence of further modification of this
-implementation. If you or your agent or exclusive licensee institute or
-order or agree to the institution of patent litigation against any
-entity (including a cross-claim or counterclaim in a lawsuit) alleging
-that this implementation of Go or any code incorporated within this
-implementation of Go constitutes direct or contributory patent
-infringement, or inducement of patent infringement, then any patent
-rights granted to you under this License for this implementation of Go
-shall terminate as of the date such litigation is filed.
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/net
Copyright (c) 2009 The Go Authors. All rights reserved.
diff --git a/_support/praefect-cluster/.gitignore b/_support/praefect-cluster/.gitignore
index 06b873206..bd035c2b7 100644
--- a/_support/praefect-cluster/.gitignore
+++ b/_support/praefect-cluster/.gitignore
@@ -1,3 +1,3 @@
-/gitaly-backup-1
-/gitaly-backup-2
-/gitaly-primary
+/gitaly-1
+/gitaly-2
+/gitaly-3
diff --git a/_support/praefect-cluster/gitaly-primary.toml b/_support/praefect-cluster/config.gitaly.toml
index 2379b6951..2379b6951 100644
--- a/_support/praefect-cluster/gitaly-primary.toml
+++ b/_support/praefect-cluster/config.gitaly.toml
diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml
index e0f163178..750faac60 100644
--- a/_support/praefect-cluster/config.praefect.toml
+++ b/_support/praefect-cluster/config.praefect.toml
@@ -24,17 +24,14 @@ whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90e
# as shard. listen_addr should be unique for all nodes.
# Requires the protocol to be defined, e.g. tcp://host.tld:1234
-[primary_server]
- name = "default"
+[[node]]
# listen_addr = "tcp://gitaly-primary:9999"
- listen_addr = "tcp://127.0.0.1:9999"
+ address = "tcp://127.0.0.1:9999"
-[[secondary_server]]
- name = "backup1"
+[[node]]
# listen_addr = "tcp://gitaly-backup-1:9999"
- listen_addr = "tcp://127.0.0.1:9998"
+ address = "tcp://127.0.0.1:9998"
-[[secondary_server]]
- name = "backup2"
+[[node]]
# listen_addr = "tcp://gitaly-backup-2:9999"
- listen_addr = "tcp://127.0.0.1:9997" \ No newline at end of file
+ address = "tcp://127.0.0.1:9997" \ No newline at end of file
diff --git a/_support/praefect-cluster/docker-compose.yml b/_support/praefect-cluster/docker-compose.yml
index 6eb81be47..09745ea41 100644
--- a/_support/praefect-cluster/docker-compose.yml
+++ b/_support/praefect-cluster/docker-compose.yml
@@ -6,15 +6,15 @@ services:
# dockerfile: Dockerfile.praefect
# image: praefect:latest
# depends_on:
-# - gitaly-primary
-# - gitaly-backup-1
-# - gitaly-backup-2
+# - gitaly-1
+# - gitaly-2
+# - gitaly-3
# command: ["/etc/gitaly/praefect", "-config", "/etc/gitaly/config.praefect.toml"]
# ports:
# - "2305:2305"
# volumes:
# - ./config.praefect.toml:/etc/gitaly/config.praefect.toml
- gitaly-primary:
+ gitaly-1:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
environment:
- GITALY_TESTING_NO_GIT_HOOKS=1
@@ -24,9 +24,9 @@ services:
- "9999:9999"
command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-primary/data:/home/git/repositories
- - ./gitaly-primary.toml:/etc/config/config.toml
- gitaly-backup-1:
+ - ./gitaly-1/data:/home/git/repositories
+ - ./config.gitaly.toml:/etc/config/config.toml
+ gitaly-2:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
environment:
- GITALY_TESTING_NO_GIT_HOOKS=1
@@ -36,9 +36,9 @@ services:
- "9998:9999"
command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-backup-1/data:/home/git/repositories
- - ./gitaly-backup-1.toml:/etc/config/config.toml
- gitaly-backup-2:
+ - ./gitaly-2/data:/home/git/repositories
+ - ./config.gitaly.toml:/etc/config/config.toml
+ gitaly-3:
image: registry.gitlab.com/gitlab-org/build/cng/gitaly:latest
environment:
- GITALY_TESTING_NO_GIT_HOOKS=1
@@ -48,5 +48,5 @@ services:
- "9997:9999"
command: ["/usr/local/bin/gitaly", "/etc/config/config.toml"]
volumes:
- - ./gitaly-backup-2/data:/home/git/repositories
- - ./gitaly-backup-2.toml:/etc/config/config.toml \ No newline at end of file
+ - ./gitaly-3/data:/home/git/repositories
+ - ./config.gitaly.toml:/etc/config/config.toml \ No newline at end of file
diff --git a/_support/praefect-cluster/gitaly-backup-1.toml b/_support/praefect-cluster/gitaly-backup-1.toml
deleted file mode 100644
index 89d1884e3..000000000
--- a/_support/praefect-cluster/gitaly-backup-1.toml
+++ /dev/null
@@ -1,49 +0,0 @@
-# Example Gitaly configuration file
-
-# The directory where Gitaly's executables are stored
-bin_dir = "/usr/local/bin"
-
-# listen on a TCP socket. This is insecure (no authentication)
-listen_addr = "0.0.0.0:9999"
-
-# # Optional: export metrics via Prometheus
-# prometheus_listen_addr = "localhost:9236"
-#
-
-# # Git executable settings
-# [git]
-# bin_path = "/usr/bin/git"
-
-[[storage]]
-name = "backup1"
-path = "/home/git/repositories"
-
-# # You can optionally configure more storages for this Gitaly instance to serve up
-#
-# [[storage]]
-# name = "other_storage"
-# path = "/mnt/other_storage/repositories"
-#
-
-# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout
-# [logging]
-# format = "json"
-# # Additionally exceptions can be reported to Sentry
-# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>"
-
-# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
-# [prometheus]
-# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
-
-[gitaly-ruby]
-# The directory where gitaly-ruby is installed
-dir = "/srv/gitaly-ruby"
-
-[gitlab-shell]
-# The directory where gitlab-shell is installed
-dir = "/srv/gitlab-shell"
-
-# # You can adjust the concurrency of each RPC endpoint
-# [[concurrency]]
-# rpc = "/gitaly.RepositoryService/GarbageCollect"
-# max_per_repo = 1
diff --git a/_support/praefect-cluster/gitaly-backup-2.toml b/_support/praefect-cluster/gitaly-backup-2.toml
deleted file mode 100644
index 1b5ce8d20..000000000
--- a/_support/praefect-cluster/gitaly-backup-2.toml
+++ /dev/null
@@ -1,49 +0,0 @@
-# Example Gitaly configuration file
-
-# The directory where Gitaly's executables are stored
-bin_dir = "/usr/local/bin"
-
-# listen on a TCP socket. This is insecure (no authentication)
-listen_addr = "0.0.0.0:9999"
-
-# # Optional: export metrics via Prometheus
-# prometheus_listen_addr = "localhost:9236"
-#
-
-# # Git executable settings
-# [git]
-# bin_path = "/usr/bin/git"
-
-[[storage]]
-name = "backup2"
-path = "/home/git/repositories"
-
-# # You can optionally configure more storages for this Gitaly instance to serve up
-#
-# [[storage]]
-# name = "other_storage"
-# path = "/mnt/other_storage/repositories"
-#
-
-# # You can optionally configure Gitaly to output JSON-formatted log messages to stdout
-# [logging]
-# format = "json"
-# # Additionally exceptions can be reported to Sentry
-# sentry_dsn = "https://<key>:<secret>@sentry.io/<project>"
-
-# # You can optionally configure Gitaly to record histogram latencies on GRPC method calls
-# [prometheus]
-# grpc_latency_buckets = [0.001, 0.005, 0.025, 0.1, 0.5, 1.0, 10.0, 30.0, 60.0, 300.0, 1500.0]
-
-[gitaly-ruby]
-# The directory where gitaly-ruby is installed
-dir = "/srv/gitaly-ruby"
-
-[gitlab-shell]
-# The directory where gitlab-shell is installed
-dir = "/srv/gitlab-shell"
-
-# # You can adjust the concurrency of each RPC endpoint
-# [[concurrency]]
-# rpc = "/gitaly.RepositoryService/GarbageCollect"
-# max_per_repo = 1
diff --git a/changelogs/unreleased/jc-sql-data-store.yml b/changelogs/unreleased/jc-sql-data-store.yml
new file mode 100644
index 000000000..e9ccb210d
--- /dev/null
+++ b/changelogs/unreleased/jc-sql-data-store.yml
@@ -0,0 +1,5 @@
+---
+title: SQL datastore for praefect
+merge_request: 1370
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index acf53d2fa..aaf5e1f20 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -19,6 +19,8 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/database"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"gitlab.com/gitlab-org/gitaly/internal/version"
"gitlab.com/gitlab-org/labkit/tracing"
)
@@ -92,13 +94,22 @@ func configure() (config.Config, error) {
func run(listeners []net.Listener, conf config.Config) error {
+ sqlDatastore, err := database.NewSQLDatastore(
+ os.Getenv("PRAEFECT_PG_USER"),
+ os.Getenv("PRAEFECT_PG_PASSWORD"),
+ os.Getenv("PRAEFECT_PG_ADDRESS"),
+ os.Getenv("PRAEFECT_PG_DATABASE"))
+
+ if err != nil {
+ return fmt.Errorf("failed to create sql datastore: %v", err)
+ }
+
var (
// top level server dependencies
- datastore = praefect.NewMemoryDatastore(conf)
- coordinator = praefect.NewCoordinator(logger, datastore)
- repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
+ datastore = praefect.NewMemoryDatastore()
+ coordinator = praefect.NewCoordinator(logger, sqlDatastore, protoregistry.GitalyProtoFileDescriptors...)
+ repl = praefect.NewReplMgr("default", logger, sqlDatastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
srv = praefect.NewServer(coordinator, repl, nil, logger)
-
// signal related
signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT}
termCh = make(chan os.Signal, len(signals))
@@ -114,14 +125,23 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- allBackendServers := append(conf.SecondaryServers, conf.PrimaryServer)
+ nodes, err := sqlDatastore.GetStorageNodes()
+ if err != nil {
+ return fmt.Errorf("failed to get storage nodes from database: %v", err)
+ }
- for _, gitaly := range allBackendServers {
- if err := coordinator.RegisterNode(gitaly.Name, gitaly.ListenAddr); err != nil {
- return fmt.Errorf("failed to register %s: %s", gitaly.Name, err)
+ addresses := make(map[string]struct{})
+ for _, node := range nodes {
+ if _, ok := addresses[node.Address]; ok {
+ continue
}
+ if err := coordinator.RegisterNode(node.Address); err != nil {
+ return fmt.Errorf("failed to register %s: %s", node.Address, err)
+ }
+
+ addresses[node.Address] = struct{}{}
- logger.WithField("node_name", gitaly.Name).WithField("gitaly listen addr", gitaly.ListenAddr).Info("registered gitaly node")
+ logger.WithField("node_address", node.Address).Info("registered gitaly node")
}
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
diff --git a/go.mod b/go.mod
index a8b29a4a1..1a1ac2d4b 100644
--- a/go.mod
+++ b/go.mod
@@ -4,25 +4,25 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/cloudflare/tableflip v0.0.0-20190329062924-8392f1641731
github.com/getsentry/raven-go v0.1.2
- github.com/golang/protobuf v1.3.1
+ github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/kelseyhightower/envconfig v1.3.0
github.com/kr/pretty v0.1.0 // indirect
+ github.com/lib/pq v1.2.0
github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47
+ github.com/onsi/ginkgo v1.8.0 // indirect
+ github.com/onsi/gomega v1.5.0 // indirect
github.com/prometheus/client_golang v1.0.0
- github.com/sirupsen/logrus v1.2.0
+ github.com/sirupsen/logrus v1.4.1
github.com/stretchr/testify v1.3.0
github.com/tinylib/msgp v1.1.0 // indirect
- gitlab.com/gitlab-org/gitaly-proto v1.37.0
+ gitlab.com/gitlab-org/gitaly-proto v1.38.0
gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c
- golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 // indirect
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
- golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
+ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/sys v0.0.0-20190422165155-953cdadca894
- google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect
google.golang.org/grpc v1.16.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
- gopkg.in/yaml.v2 v2.2.2 // indirect
)
diff --git a/go.sum b/go.sum
index d97d828f7..eb5db3227 100644
--- a/go.sum
+++ b/go.sum
@@ -35,6 +35,8 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
@@ -56,6 +58,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
+github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47 h1:HDt7WT3kpXSHq4mlOuLzgXH9LeOK1qlhyFdKIAzxxeM=
github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47/go.mod h1:4bKN42efkbNYMZlvDfxGDxzl066GhpvIircZDsm8Y+Y=
github.com/lightstep/lightstep-tracer-go v0.15.6 h1:D0GGa7afJ7GcQvu5as6ssLEEKYXvRgKI5d5cevtz8r4=
@@ -68,8 +72,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
+github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
+github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
+github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
@@ -93,12 +101,15 @@ github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNG
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
+github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU=
github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
@@ -108,8 +119,8 @@ github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1
github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo=
github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
-gitlab.com/gitlab-org/gitaly-proto v1.37.0 h1:cRQXF3kW+AR1eLIYTWfb+Eqa+Wd0PIcTq4FkaSCWvK4=
-gitlab.com/gitlab-org/gitaly-proto v1.37.0/go.mod h1:zNjk/86bjwLVJ4NcvInBcXcLdptdRFQ28sYrdFbrFgY=
+gitlab.com/gitlab-org/gitaly-proto v1.38.0 h1:46jlky1yhAC+WfYA9F/0N328QTYMBxOc1/AAwD7rQs8=
+gitlab.com/gitlab-org/gitaly-proto v1.38.0/go.mod h1:zNjk/86bjwLVJ4NcvInBcXcLdptdRFQ28sYrdFbrFgY=
gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c h1:xo48LcGsTCasKcJpQDBCCuZU+aP8uGaboUVvD7Lgm6g=
gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c/go.mod h1:rYhLgfrbEcyfinG+R3EvKu6bZSsmwQqcXzLfHWSfUKM=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
@@ -117,14 +128,10 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc=
-golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
-golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
@@ -134,13 +141,14 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI=
+golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
-golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
@@ -148,9 +156,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
-google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 h1:yvw+zsSmSM02Z5H3ZdEV7B7Ql7eFrjQTnmByJvK+3J8=
-google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
gopkg.in/DataDog/dd-trace-go.v1 v1.7.0 h1:7wbMayb6JXcbAS95RN7MI42W3o1BCxCcdIzZfVWBAiE=
@@ -164,7 +171,6 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
-gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/internal/helper/storage.go b/internal/helper/storage.go
index 4e535a5d6..a76b50f52 100644
--- a/internal/helper/storage.go
+++ b/internal/helper/storage.go
@@ -34,6 +34,22 @@ func ExtractGitalyServers(ctx context.Context) (gitalyServersInfo storage.Gitaly
return
}
+// IncomingToOutgoing creates an outgoing context out of an incoming context with the same storage metadata
+func IncomingToOutgoing(ctx context.Context) context.Context {
+ md, ok := metadata.FromIncomingContext(ctx)
+ if !ok {
+ return ctx
+ }
+
+ gitalyServersJSONEncoded := md["gitaly-servers"]
+ if len(gitalyServersJSONEncoded) == 0 {
+ return ctx
+ }
+
+ return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", gitalyServersJSONEncoded[0]))
+
+}
+
// InjectGitalyServers injects gitaly-servers metadata into an outgoing context
func InjectGitalyServers(ctx context.Context, name, address, token string) (context.Context, error) {
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
index 2df2a4823..a09a292ad 100644
--- a/internal/praefect/common.go
+++ b/internal/praefect/common.go
@@ -1,13 +1,5 @@
package praefect
-import "google.golang.org/grpc"
-
-// Node is a wrapper around the grpc client connection for a backend Gitaly node
-type Node struct {
- Storage string
- cc *grpc.ClientConn
-}
-
// logging keys to use with logrus WithField
const (
logKeyProjectPath = "ProjectPath"
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 6a2a5b5d5..eb0fad56b 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -15,8 +15,7 @@ type Config struct {
ListenAddr string `toml:"listen_addr"`
SocketPath string `toml:"socket_path"`
- PrimaryServer *models.GitalyServer `toml:"primary_server"`
- SecondaryServers []*models.GitalyServer `toml:"secondary_server"`
+ StorageNodes []*models.StorageNode `toml:"storage_node"`
// Whitelist is a list of relative project paths (paths comprised of project
// hashes) that are permitted to use high availability features
@@ -26,13 +25,6 @@ type Config struct {
PrometheusListenAddr string `toml:"prometheus_listen_addr"`
}
-// GitalyServer allows configuring the servers that RPCs are proxied to
-type GitalyServer struct {
- Name string `toml:"name"`
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- Token string `toml:"token"`
-}
-
// FromFile loads the config for the passed file path
func FromFile(filePath string) (Config, error) {
config := &Config{}
@@ -50,32 +42,30 @@ var (
errNoListener = errors.New("no listen address or socket path configured")
errNoGitalyServers = errors.New("no primary gitaly backends configured")
errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique")
- errGitalyWithoutName = errors.New("all gitaly servers must have a name")
+ errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address")
)
-var emptyServer = &models.GitalyServer{}
-
// Validate establishes if the config is valid
func (c Config) Validate() error {
if c.ListenAddr == "" && c.SocketPath == "" {
return errNoListener
}
- if c.PrimaryServer == nil || c.PrimaryServer == emptyServer {
+ if len(c.StorageNodes) == 0 {
return errNoGitalyServers
}
- listenAddrs := make(map[string]bool, len(c.SecondaryServers)+1)
- for _, gitaly := range append(c.SecondaryServers, c.PrimaryServer) {
- if gitaly.Name == "" {
- return errGitalyWithoutName
+ listenAddrs := make(map[string]bool, len(c.StorageNodes))
+ for _, node := range c.StorageNodes {
+ if node.Address == "" {
+ return errGitalyWithoutAddr
}
- if _, found := listenAddrs[gitaly.ListenAddr]; found {
+ if _, found := listenAddrs[node.Address]; found {
return errDuplicateGitalyAddr
}
- listenAddrs[gitaly.ListenAddr] = true
+ listenAddrs[node.Address] = true
}
return nil
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index eace5eb2f..b89bdd648 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -9,10 +9,10 @@ import (
)
func TestConfigValidation(t *testing.T) {
- primarySrv := &models.GitalyServer{"test", "localhost:23456", "secret-token"}
- secondarySrvs := []*models.GitalyServer{
- {"test1", "localhost:23457", "secret-token"},
- {"test2", "localhost:23458", "secret-token"},
+ nodes := []*models.StorageNode{
+ {ID: 1, Address: "localhost:23456", Token: "secret-token"},
+ {ID: 2, Address: "localhost:23457", Token: "secret-token"},
+ {ID: 3, Address: "localhost:23458", Token: "secret-token"},
}
testCases := []struct {
@@ -22,12 +22,12 @@ func TestConfigValidation(t *testing.T) {
}{
{
desc: "No ListenAddr or SocketPath",
- config: Config{ListenAddr: "", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "", StorageNodes: nodes},
err: errNoListener,
},
{
desc: "Only a SocketPath",
- config: Config{SocketPath: "/tmp/praefect.socket", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes},
err: nil,
},
{
@@ -37,12 +37,12 @@ func TestConfigValidation(t *testing.T) {
},
{
desc: "duplicate address",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: []*models.GitalyServer{primarySrv}},
+ config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address})},
err: errDuplicateGitalyAddr,
},
{
desc: "Valid config",
- config: Config{ListenAddr: "localhost:1234", PrimaryServer: primarySrv, SecondaryServers: secondarySrvs},
+ config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes},
err: nil,
},
}
@@ -63,18 +63,18 @@ func TestConfigParsing(t *testing.T) {
{
filePath: "testdata/config.toml",
expected: Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*models.GitalyServer{
+ StorageNodes: []*models.StorageNode{
+ {
+ Address: "tcp://gitaly-internal-1.example.com",
+ Storage: "praefect-internal-1",
+ },
{
- Name: "default",
- ListenAddr: "tcp://gitaly-backup1.example.com",
+ Address: "tcp://gitaly-internal-2.example.com",
+ Storage: "praefect-internal-2",
},
{
- Name: "backup",
- ListenAddr: "tcp://gitaly-backup2.example.com",
+ Address: "tcp://gitaly-internal-3.example.com",
+ Storage: "praefect-internal-3",
},
},
Whitelist: []string{"abcd1234", "edfg5678"},
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 81701a359..247db51a9 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -3,20 +3,21 @@ socket_path = ""
whitelist = ["abcd1234", "edfg5678"]
prometheus_listen_addr = ""
-[primary_server]
- name = "default"
- listen_addr = "tcp://gitaly-primary.example.com"
-
-[[secondary_server]]
- name = "default"
- listen_addr = "tcp://gitaly-backup1.example.com"
-
-[[secondary_server]]
- name = "backup"
- listen_addr = "tcp://gitaly-backup2.example.com"
-
[logging]
format = ""
sentry_dsn = ""
ruby_sentry_dsn = ""
level = ""
+
+[[storage_node]]
+ address = "tcp://gitaly-internal-1.example.com"
+ storage = "praefect-internal-1"
+
+[[storage_node]]
+ address = "tcp://gitaly-internal-2.example.com"
+ storage = "praefect-internal-2"
+
+[[storage_node]]
+ address = "tcp://gitaly-internal-3.example.com"
+ storage = "praefect-internal-3"
+
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 8f64022cb..86a254b5a 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -2,12 +2,15 @@ package praefect
import (
"context"
+ "database/sql"
+ "errors"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
+ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
@@ -19,8 +22,6 @@ import (
"gitlab.com/gitlab-org/gitaly/client"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
// Coordinator takes care of directing client requests to the appropriate
@@ -31,14 +32,14 @@ type Coordinator struct {
failoverMutex sync.RWMutex
connMutex sync.RWMutex
- datastore Datastore
+ datastore ReplicasDatastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, datastore Datastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore ReplicasDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -55,17 +56,18 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
-// GetStorageNode returns the registered node for the given storage location
-func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
- cc, ok := c.getConn(storage)
- if !ok {
- return Node{}, fmt.Errorf("no node registered for storage location %q", storage)
+func targetRepo(mi protoregistry.MethodInfo, frame []byte) (*gitalypb.Repository, error) {
+ m, err := mi.UnmarshalRequestProto(frame)
+ if err != nil {
+ return nil, err
}
- return Node{
- Storage: storage,
- cc: cc,
- }, nil
+ targetRepo, err := mi.TargetRepo(m)
+ if err != nil {
+ return nil, err
+ }
+
+ return targetRepo, nil
}
// streamDirector determines which downstream servers receive requests
@@ -77,34 +79,77 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
c.failoverMutex.RLock()
defer c.failoverMutex.RUnlock()
- serverConfig, err := c.datastore.GetDefaultPrimary()
+ frames, err := peeker.Peek(ctx, 1)
if err != nil {
- err := status.Error(
- codes.FailedPrecondition,
- "no downstream node registered",
- )
return nil, nil, err
}
- // We only need the primary node, as there's only one primary storage
- // location per praefect at this time
- cc, ok := c.getConn(serverConfig.Name)
- if !ok {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", serverConfig.Name)
+ mi, err := c.registry.LookupMethod(fullMethodName)
+ if err != nil {
+ return nil, nil, err
}
- ctx, err = helper.InjectGitalyServers(ctx, serverConfig.Name, serverConfig.ListenAddr, serverConfig.Token)
+ var primary *models.StorageNode
+
+ if mi.Scope == protoregistry.ScopeRepository {
+ targetRepo, err := targetRepo(mi, frames[0])
+ if err != nil {
+ return nil, nil, err
+ }
+
+ primary, err = c.datastore.GetPrimary(targetRepo.GetStorageName(), targetRepo.GetRelativePath())
+
+ if err != nil {
+ if err != sql.ErrNoRows {
+ return nil, nil, err
+ }
+ // if there are no primaries for this repository, pick one
+ nodes, err := c.datastore.GetStorageNodes()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if len(nodes) == 0 {
+ return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
+
+ }
+ //newPrimary := nodes[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodes))]
+ newPrimary := nodes[0]
+
+ // set the primary
+ if err = c.datastore.SetPrimary(targetRepo.GetStorageName(), targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
+ return nil, nil, err
+ }
+
+ primary = &newPrimary
+ }
+ } else {
+ //TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
+ // proxy requests that are not repository scoped
+ node, err := c.datastore.GetStorageNodes()
+ if err != nil {
+ return nil, nil, err
+ }
+ if len(node) == 0 {
+ return nil, nil, errors.New("no node storages found")
+ }
+ primary = &node[0]
+ }
+
+ // We only need the primary node, as there's only one primary storage
+ // location per praefect at this time
+ cc, err := c.GetConnection(primary.Address)
if err != nil {
- return nil, nil, err
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Address)
}
- return ctx, cc, nil
+ return helper.IncomingToOutgoing(ctx), cc, nil
}
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
- conn, err := client.Dial(listenAddr,
+func (c *Coordinator) RegisterNode(address string) error {
+ conn, err := client.Dial(address,
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentials(gitalyconfig.Config.Auth.Token)),
@@ -114,23 +159,28 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
return err
}
- c.setConn(storageName, conn)
+ c.setConn(address, conn)
return nil
}
-func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
+func (c *Coordinator) setConn(address string, conn *grpc.ClientConn) {
c.connMutex.Lock()
- c.nodes[storageName] = conn
+ c.nodes[address] = conn
c.connMutex.Unlock()
}
-func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) {
+// GetConnection gets the grpc client connection based on an address
+func (c *Coordinator) GetConnection(address string) (*grpc.ClientConn, error) {
c.connMutex.RLock()
- cc, ok := c.nodes[storageName]
+ cc, ok := c.nodes[address]
c.connMutex.RUnlock()
+ if !ok {
+ return nil, errors.New("client connection not found")
+ }
+
+ return cc, nil
- return cc, ok
}
// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
@@ -146,55 +196,7 @@ func (c *Coordinator) handleSignalAndRotate() {
<-failoverChan
c.failoverMutex.Lock()
- primary, err := c.datastore.GetDefaultPrimary()
- if err != nil {
- c.log.Fatalf("error when getting default primary: %v", err)
- }
-
- if err := c.rotateSecondaryToPrimary(primary); err != nil {
- c.log.WithError(err).Error("rotating secondary")
- }
+ c.log.Info("failover happens")
c.failoverMutex.Unlock()
}
}
-
-func (c *Coordinator) rotateSecondaryToPrimary(primary models.GitalyServer) error {
- repositories, err := c.datastore.GetRepositoriesForPrimary(primary)
- if err != nil {
- return err
- }
-
- for _, repoPath := range repositories {
- secondaries, err := c.datastore.GetShardSecondaries(models.Repository{
- RelativePath: repoPath,
- })
- if err != nil {
- return fmt.Errorf("getting secondaries: %v", err)
- }
-
- newPrimary := secondaries[0]
- secondaries = append(secondaries[1:], primary)
-
- if err = c.datastore.SetShardPrimary(models.Repository{
- RelativePath: repoPath,
- }, newPrimary); err != nil {
- return fmt.Errorf("setting primary: %v", err)
- }
-
- if err = c.datastore.SetShardSecondaries(models.Repository{
- RelativePath: repoPath,
- }, secondaries); err != nil {
- return fmt.Errorf("setting secondaries: %v", err)
- }
- }
-
- // set the new default primary
- primary, err = c.datastore.GetShardPrimary(models.Repository{
- RelativePath: repositories[0],
- })
- if err != nil {
- return fmt.Errorf("getting shard primary: %v", err)
- }
-
- return c.datastore.SetDefaultPrimary(primary)
-}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 50045f8a0..0275c6048 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,9 +5,6 @@ import (
"testing"
"github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
var testLogger = logrus.New()
@@ -17,31 +14,5 @@ func init() {
}
func TestSecondaryRotation(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{Name: "primary"},
- SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}},
- Whitelist: []string{"/repoA", "/repoB"},
- }
- d := NewMemoryDatastore(cfg)
- c := NewCoordinator(testLogger, d)
-
- primary, err := d.GetDefaultPrimary()
- require.NoError(t, err)
-
- require.NoError(t, c.rotateSecondaryToPrimary(primary))
-
- primary, err = d.GetDefaultPrimary()
- require.NoError(t, err)
- require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary")
-
- repositories, err := d.GetRepositoriesForPrimary(primary)
- require.NoError(t, err)
-
- for _, repository := range repositories {
- shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository})
- require.NoError(t, err)
-
- require.Len(t, shardSecondaries, 2)
- require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0])
- }
+ t.Skip("secondary rotation will change with the new data model")
}
diff --git a/internal/praefect/database/migrations/1_initial_up.sql b/internal/praefect/database/migrations/1_initial_up.sql
new file mode 100644
index 000000000..774d4ff42
--- /dev/null
+++ b/internal/praefect/database/migrations/1_initial_up.sql
@@ -0,0 +1,19 @@
+CREATE TABLE IF NOT EXISTS storage_nodes (
+ id SERIAL PRIMARY KEY,
+ storage TEXT NOT NULL,
+ address TEXT NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS shards (
+ id SERIAL PRIMARY KEY,
+ storage TEXT NOT NULL,
+ relative_path TEXT NOT NULL,
+ "primary" INTEGER REFERENCES storage_nodes (id),
+ UNIQUE (storage, relative_path)
+);
+
+CREATE TABLE IF NOT EXISTS shard_secondaries (
+ shard_id INTEGER REFERENCES shards(id),
+ storage_node_id INTEGER REFERENCES storage_nodes(id),
+ PRIMARY KEY(shard_id, storage_node_id)
+); \ No newline at end of file
diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/database/sql_datastore.go
new file mode 100644
index 000000000..bd64a2ef9
--- /dev/null
+++ b/internal/praefect/database/sql_datastore.go
@@ -0,0 +1,225 @@
+package database
+
+import (
+ "errors"
+ "fmt"
+
+ "database/sql"
+
+ // the lib/pg package provides postgres bindings for the sql package
+ _ "github.com/lib/pq"
+
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+)
+
+// SQLDatastore is a sql based datastore that conforms to the ReplicasDatastore interface
+type SQLDatastore struct {
+ db *sql.DB
+}
+
+// NewSQLDatastore instantiates a new sql datastore with environment variables
+func NewSQLDatastore(user, password, address, database string) (*SQLDatastore, error) {
+ connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", user, password, address, database)
+ db, err := sql.Open("postgres", connStr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &SQLDatastore{db: db}, nil
+}
+
+// GetSecondaries gets the secondaries for a shard based on the relative path
+func (sd *SQLDatastore) GetSecondaries(storage, relativePath string) ([]models.StorageNode, error) {
+ var secondaries []models.StorageNode
+
+ rows, err := sd.db.Query(`
+ SELECT storage_nodes.id, storage_nodes.address FROM shards
+ INNER JOIN shard_secondaries ON shards.id = shard_secondaries.shard_id
+ INNER JOIN storage_nodes ON storage_nodes.id = shard_secondaries.storage_node_id WHERE shards.storage = $1 AND shards.relative_path = $2`, storage, relativePath)
+
+ if err != nil {
+ return nil, err
+ }
+
+ for rows.Next() {
+ var s models.StorageNode
+ err = rows.Scan(&s.ID, &s.Address)
+ if err != nil {
+ return nil, err
+ }
+ secondaries = append(secondaries, s)
+ }
+
+ return secondaries, nil
+}
+
+// GetStorageNode gets all storage storage_nodes
+func (sd *SQLDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) {
+ var node models.StorageNode
+
+ row := sd.db.QueryRow("SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM storage_nodes WHERE storage_nodes.id = $1", nodeID)
+
+ err := row.Scan(&node.ID, &node.Address, &node.Storage)
+ if err != nil {
+ return node, err
+ }
+
+ return node, nil
+
+}
+
+// GetStorageNodes gets all storage storage_nodes
+func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) {
+ var nodeStorages []models.StorageNode
+
+ rows, err := sd.db.Query("SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM storage_nodes")
+
+ if err != nil {
+ return nil, err
+ }
+
+ for rows.Next() {
+ var nodeStorage models.StorageNode
+ err = rows.Scan(&nodeStorage.ID, &nodeStorage.Address, &nodeStorage.Storage)
+ if err != nil {
+ return nil, err
+ }
+ nodeStorages = append(nodeStorages, nodeStorage)
+ }
+
+ return nodeStorages, nil
+
+}
+
+// GetPrimary gets the primary storage node for a shard of a repository relative path
+func (sd *SQLDatastore) GetPrimary(storage, relativePath string) (*models.StorageNode, error) {
+
+ row := sd.db.QueryRow(`
+ SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM shards
+ INNER JOIN storage_nodes ON shards.primary = storage_nodes.id
+ WHERE shards.storage = $1 AND shards.relative_path = $2
+ `, storage, relativePath)
+
+ var s models.StorageNode
+ if err := row.Scan(&s.ID, &s.Address, &s.Storage); err != nil {
+ return nil, err
+ }
+
+ return &s, nil
+}
+
+// SetPrimary sets the primary storagee node for a shard of a repository relative path
+func (sd *SQLDatastore) SetPrimary(storage, relativePath string, storageNodeID int) error {
+ res, err := sd.db.Exec(`UPDATE shards SET "primary" = $1 WHERE storage = $2 AND relative_path = $3`, storageNodeID, storage, relativePath)
+ if err != nil {
+ return err
+ }
+
+ if n, err := res.RowsAffected(); err != nil {
+ return err
+ } else if n == 0 {
+ res, err = sd.db.Exec(`INSERT INTO shards (storage, relative_path, "primary") VALUES ($1, $2, $3)`, storage, relativePath, storageNodeID)
+ if err != nil {
+ return err
+ }
+ if n, err := res.RowsAffected(); err != nil {
+ return err
+ } else if n == 0 {
+ return errors.New("failed to set primary")
+ }
+ }
+
+ return nil
+}
+
+// AddSecondary adds a secondary to a shard of a repository relative path
+func (sd *SQLDatastore) AddSecondary(storage, relativePath string, storageNodeID int) error {
+ res, err := sd.db.Exec(`
+ INSERT INTO shard_secondaries (shard_id, storage_node_id)
+ VALUES (SELECT id, $1 FROM shards WHERE storage = $2 AND relative_path = $3)`, storageNodeID, storage, relativePath)
+ if err != nil {
+ return err
+ }
+
+ if n, err := res.RowsAffected(); err != nil {
+ return err
+ } else if n == 0 {
+ return errors.New("secondary already exists")
+ }
+
+ return nil
+}
+
+// RemoveSecondary removes a secondary from a shard of a repository relative path
+func (sd *SQLDatastore) RemoveSecondary(storage, relativePath string, storageNodeID int) error {
+ res, err := sd.db.Exec(`
+ DELETE FROM shard_secondaries (shard_relative_path, node_storage_id)
+ WHERE shard_id = (SELECT id FROM shard where storage = $1 AND relative_path = $2) AND storage_node_id = $3`, storage, relativePath, storageNodeID)
+ if err != nil {
+ return err
+ }
+
+ if n, err := res.RowsAffected(); err != nil {
+ return err
+ } else if n == 0 {
+ return errors.New("secondary did not exist")
+ }
+
+ return nil
+}
+
+// GetShard gets the shard for a repository relative path
+func (sd *SQLDatastore) GetShard(storage, relativePath string) (*models.Shard, error) {
+ primary, err := sd.GetPrimary(storage, relativePath)
+ if err != nil {
+ return nil, fmt.Errorf("getting primary: %v", err)
+ }
+
+ secondaries, err := sd.GetSecondaries(storage, relativePath)
+ if err != nil {
+ return nil, fmt.Errorf("getting secondaries: %v", err)
+ }
+
+ return &models.Shard{RelativePath: relativePath, Primary: *primary, Secondaries: secondaries}, nil
+}
+
+// RotatePrimary rotates a primary out of being primary, and picks a secondary of each shard at random to promote to the new primary
+func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error {
+
+ // Add the primary as a secondary
+ res, err := sd.db.Exec(`
+ INSERT INTO shard_secondaries (shard_id, node_storage_id) VALUES (SELECT shards.id, shards.primary FROM shards WHERE shards.primary = $1)
+ `, primaryNodeStorageID)
+ if err != nil {
+ return err
+ }
+
+ affected, err := res.RowsAffected()
+ if err != nil {
+ return err
+ }
+
+ if affected == 0 {
+ return fmt.Errorf("no shards with primary %d found", primaryNodeStorageID)
+ }
+
+ // Choose a new secondary
+ res, err = sd.db.Exec(`UPDATE shards SET "primary" =
+ (SELECT shard_secondaries.storage_node_id FROM shard_secondaries
+ INNER JOIN shards ON shard_secondaries.shard_id = shards.id
+ WHERE shards.primary = $1 AND shards.primary != shard_secondaries.storage_node_id LIMIT 1)
+ `, primaryNodeStorageID)
+ if err != nil {
+ return err
+ }
+
+ affected, err = res.RowsAffected()
+ if err != nil {
+ return err
+ }
+
+ if affected == 0 {
+ return errors.New("no secondaries available to rotate")
+ }
+ return nil
+}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 5678c6a24..497f6ba03 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -11,7 +11,6 @@ import (
"sort"
"sync"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
@@ -42,10 +41,10 @@ const (
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
- ID uint64 // autoincrement ID
- Target string // which storage location to replicate to?
- Source models.Repository // source for replication
- State JobState
+ ID uint64 // autoincrement ID
+ TargetNodeID int // which node to replicate to?
+ Source models.Repository // source for replication
+ State JobState
}
// replJobs provides sort manipulation behavior
@@ -64,32 +63,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I
type Datastore interface {
ReplJobsDatastore
ReplicasDatastore
- TemporaryDatastore
-}
-
-// TemporaryDatastore contains methods that will go away once we move to a SQL datastore
-type TemporaryDatastore interface {
- GetDefaultPrimary() (models.GitalyServer, error)
- SetDefaultPrimary(primary models.GitalyServer) error
}
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- // GetSecondaries will retrieve all secondary replica storage locations for
- // a primary replica
- GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error)
+ GetSecondaries(storage, relativePath string) ([]models.StorageNode, error)
+
+ GetStorageNode(nodeID int) (models.StorageNode, error)
+
+ GetStorageNodes() ([]models.StorageNode, error)
+
+ GetPrimary(storage, relativePath string) (*models.StorageNode, error)
- GetShardPrimary(repo models.Repository) (models.GitalyServer, error)
+ SetPrimary(storage, relativePath string, storageNodeID int) error
- // SetSecondaries will set the secondary storage locations for a repository
- // in a primary replica.
- SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error
+ AddSecondary(storage, relativePath string, storageNodeID int) error
- SetShardPrimary(repo models.Repository, primary models.GitalyServer) error
+ RemoveSecondary(storage, relativePath string, storageNodeID int) error
- // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary
- GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error)
+ GetShard(storage, relativePath string) (*models.Shard, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -98,26 +91,20 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, node string, count int) ([]ReplJob, error)
+ GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error)
// CreateSecondaryJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateSecondaryReplJobs(source models.Repository) ([]uint64, error)
+ CreateSecondaryReplJobs(storge, relativePath string) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
}
-// shard is a set of primary and secondary storage replicas for a project
-type shard struct {
- primary models.GitalyServer
- secondaries []models.GitalyServer
-}
-
type jobRecord struct {
relativePath string // project's relative path
- targetNode string
+ targetNodeID int
state JobState
}
@@ -125,31 +112,31 @@ type jobRecord struct {
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
type MemoryDatastore struct {
- replicas *struct {
- sync.RWMutex
- m map[string]shard // keyed by project's relative path
- }
-
jobs *struct {
sync.RWMutex
next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
- primary *struct {
+ storageNodes *struct {
sync.RWMutex
- server models.GitalyServer
+ m map[int]models.StorageNode
+ }
+
+ shards *struct {
+ sync.RWMutex
+ m map[string]models.Shard
}
}
// NewMemoryDatastore returns an initialized in-memory datastore
-func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
- m := &MemoryDatastore{
- replicas: &struct {
+func NewMemoryDatastore() *MemoryDatastore {
+ return &MemoryDatastore{
+ storageNodes: &struct {
sync.RWMutex
- m map[string]shard
+ m map[int]models.StorageNode
}{
- m: map[string]shard{},
+ m: map[int]models.StorageNode{},
},
jobs: &struct {
sync.RWMutex
@@ -159,106 +146,149 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
- primary: &struct {
+ shards: &struct {
sync.RWMutex
- server models.GitalyServer
+ m map[string]models.Shard
}{
- server: models.GitalyServer{
- Name: cfg.PrimaryServer.Name,
- ListenAddr: cfg.PrimaryServer.ListenAddr,
- Token: cfg.PrimaryServer.Token,
- },
+ m: map[string]models.Shard{},
},
}
+}
- secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers))
- for i, server := range cfg.SecondaryServers {
- secondaryServers[i] = *server
+// GetSecondaries gets the secondaries for a shard based on the relative path
+func (md *MemoryDatastore) GetSecondaries(storage, relativePath string) ([]models.StorageNode, error) {
+ md.shards.RLock()
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
+ defer md.shards.RUnlock()
+
+ shard, ok := md.shards.m[storage+relativePath]
+ if !ok {
+ return nil, errors.New("shard not found")
}
- for _, repo := range cfg.Whitelist {
- // store the configuration file specified shard
- m.replicas.m[repo] = shard{
- primary: *cfg.PrimaryServer,
- secondaries: secondaryServers,
- }
+ return shard.Secondaries, nil
+}
- // initialize replication job queue to replicate all whitelisted repos
- // to every secondary server
- for _, secondary := range cfg.SecondaryServers {
- m.jobs.next++
- m.jobs.records[m.jobs.next] = jobRecord{
- state: JobStateReady,
- targetNode: secondary.Name,
- relativePath: repo,
- }
- }
+// GetStorageNode gets all storage nodes
+func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
+
+ node, ok := md.storageNodes.m[nodeID]
+ if !ok {
+ return models.StorageNode{}, errors.New("node not found")
}
- return m
+ return node, nil
}
-// GetShardSecondaries will return the set of secondary storage locations for a
-// given repository if they exist
-func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) {
- shard, _ := md.getShard(primary.RelativePath)
+// GetStorageNodes gets all storage nodes
+func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
- return shard.secondaries, nil
+ var storageNodes []models.StorageNode
+ for _, storageNode := range md.storageNodes.m {
+ storageNodes = append(storageNodes, storageNode)
+ }
+
+ return storageNodes, nil
}
-// SetShardSecondaries will replace the set of replicas for a repository
-func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// GetPrimary gets the primary storage node for a shard of a repository relative path
+func (md *MemoryDatastore) GetPrimary(storage, relativePath string) (*models.StorageNode, error) {
+ md.shards.RLock()
+ defer md.shards.RUnlock()
- shard := md.replicas.m[repo.RelativePath]
- shard.secondaries = secondaries
- md.replicas.m[repo.RelativePath] = shard
+ shard, ok := md.shards.m[storage+relativePath]
+ if !ok {
+ return nil, errors.New("shard not found")
+ }
+
+ storageNode, ok := md.storageNodes.m[shard.Primary.ID]
+ if !ok {
+ return nil, errors.New("node storage not found")
+ }
+ return &storageNode, nil
- return nil
}
-// SetShardPrimary sets the primary for a repository
-func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// SetPrimary sets the primary storagee node for a shard of a repository relative path
+func (md *MemoryDatastore) SetPrimary(storage, relativePath string, storageNodeID int) error {
+ md.shards.Lock()
+ defer md.shards.Unlock()
+
+ shard, ok := md.shards.m[storage+relativePath]
+ if !ok {
+ return errors.New("shard not found")
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
- shard := md.replicas.m[repo.RelativePath]
- shard.primary = primary
- md.replicas.m[repo.RelativePath] = shard
+ shard.Primary = storageNode
+ md.shards.m[storage+relativePath] = shard
return nil
}
-// GetShardPrimary gets the primary for a repository
-func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// AddSecondary adds a secondary to a shard of a repository relative path
+func (md *MemoryDatastore) AddSecondary(storage, relativePath string, storageNodeID int) error {
+ md.shards.Lock()
+ defer md.shards.Unlock()
- shard := md.replicas.m[repo.RelativePath]
- return shard.primary, nil
+ shard, ok := md.shards.m[storage+relativePath]
+ if !ok {
+ return errors.New("shard not found")
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
+
+ shard.Secondaries = append(shard.Secondaries, storageNode)
+
+ md.shards.m[storage+relativePath] = shard
+ return nil
}
-// GetRepositoriesForPrimary gets all repositories
-func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// RemoveSecondary removes a secondary from a shard of a repository relative path
+func (md *MemoryDatastore) RemoveSecondary(storage, relativePath string, storageNodeID int) error {
+ md.shards.Lock()
+ defer md.shards.Unlock()
- repositories := make([]string, 0, len(md.replicas.m))
+ shard, ok := md.shards.m[storage+relativePath]
+ if !ok {
+ return errors.New("shard not found")
+ }
- for repository := range md.replicas.m {
- repositories = append(repositories, repository)
+ var secondaries []models.StorageNode
+ for _, secondary := range shard.Secondaries {
+ if secondary.ID != storageNodeID {
+ secondaries = append(secondaries, secondary)
+ }
}
- return repositories, nil
+ shard.Secondaries = secondaries
+ md.shards.m[storage+relativePath] = shard
+ return nil
}
-func (md *MemoryDatastore) getShard(project string) (shard, bool) {
- md.replicas.RLock()
- replicas, ok := md.replicas.m[project]
- md.replicas.RUnlock()
+// GetShard gets the shard for a repository relative path
+func (md *MemoryDatastore) GetShard(storage, relativePath string) (*models.Shard, error) {
+ md.shards.Lock()
+ defer md.shards.Unlock()
- return replicas, ok
+ shard, ok := md.shards.m[storage+relativePath]
+ if !ok {
+ return nil, errors.New("shard not found")
+ }
+
+ return &shard, nil
}
// ErrSecondariesMissing indicates the repository does not have any backup
@@ -266,7 +296,7 @@ func (md *MemoryDatastore) getShard(project string) (shard, bool) {
var ErrSecondariesMissing = errors.New("repository missing secondary replicas")
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) {
+func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) {
md.jobs.RLock()
defer md.jobs.RUnlock()
@@ -274,7 +304,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.targetNode == storage {
+ if record.state&state != 0 && record.targetNodeID == targetNodeID {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -295,58 +325,48 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
// replJobFromRecord constructs a replication job from a record and by cross
// referencing the current shard for the project being replicated
func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- shard, ok := md.getShard(record.relativePath)
- if !ok {
- return ReplJob{}, fmt.Errorf(
- "unable to find shard for project at relative path %q",
- record.relativePath,
- )
- }
-
return ReplJob{
ID: jobID,
Source: models.Repository{
RelativePath: record.relativePath,
- Storage: shard.primary.Name,
},
- State: record.state,
- Target: record.targetNode,
+ State: record.state,
+ TargetNodeID: record.targetNodeID,
}, nil
}
-// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because
+// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because
// it fails preconditions for being replicatable
-var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication")
+var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication")
// CreateSecondaryReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) {
+func (md *MemoryDatastore) CreateSecondaryReplJobs(storage, relativePath string) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
- emptyRepo := models.Repository{}
- if source == emptyRepo {
+ if relativePath == "" {
return nil, errors.New("invalid source repository")
}
- shard, ok := md.getShard(source.RelativePath)
- if !ok {
+ shard, err := md.GetShard(storage, relativePath)
+ if err != nil {
return nil, fmt.Errorf(
"unable to find shard for project at relative path %q",
- source.RelativePath,
+ relativePath,
)
}
var jobIDs []uint64
- for _, secondary := range shard.secondaries {
+ for _, secondary := range shard.Secondaries {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- targetNode: secondary.Name,
+ targetNodeID: secondary.ID,
state: JobStatePending,
- relativePath: source.RelativePath,
+ relativePath: relativePath,
}
jobIDs = append(jobIDs, nextID)
@@ -375,36 +395,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
-
-// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error {
- md.primary.Lock()
- defer md.primary.Unlock()
-
- md.primary.server = primary
-
- return nil
-}
-
-// GetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- primary := md.primary.server
- if primary == (models.GitalyServer{}) {
- return primary, ErrPrimaryNotSet
- }
-
- return primary, nil
-}
-
-// SetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- md.primary.server = primary
-
- return nil
-}
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
index 6099a8328..d3da4aeff 100644
--- a/internal/praefect/datastore_memory_test.go
+++ b/internal/praefect/datastore_memory_test.go
@@ -1,11 +1,9 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
@@ -13,81 +11,99 @@ import (
// populate itself with the correct replication jobs and shards when initialized
// with a configuration file specifying the shard and whitelisted repositories.
func TestMemoryDatastoreWhitelist(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup-1",
- },
- {
- Name: "backup-2",
- },
- },
- Whitelist: []string{"abcd1234", "5678efgh"},
+ repo1 := models.Repository{
+ RelativePath: "abcd1234",
+ Storage: "default",
}
- mds := praefect.NewMemoryDatastore(cfg)
+ repo2 := models.Repository{
+ RelativePath: "5678efgh",
+ Storage: "default",
+ }
+ mds := NewMemoryDatastore()
- repo1 := models.Repository{
- RelativePath: cfg.Whitelist[0],
- Storage: cfg.PrimaryServer.Name,
+ mds.storageNodes.m[1] = models.StorageNode{
+ ID: 1,
+ Address: "tcp://default",
+ Storage: "praefect-internal-1",
+ }
+ mds.storageNodes.m[2] = models.StorageNode{
+ ID: 2,
+ Address: "tcp://backup-1",
+ Storage: "praefect-internal-2",
+ }
+ mds.storageNodes.m[3] = models.StorageNode{
+ ID: 3,
+ Address: "tcp://backup-2",
+ Storage: "praefect-internal-3",
+ }
+ mds.shards.m[repo1.Storage+repo1.RelativePath] = models.Shard{
+ Storage: repo1.Storage,
+ RelativePath: repo1.RelativePath,
+ Primary: mds.storageNodes.m[1],
+ Secondaries: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]},
+ }
+ mds.shards.m[repo2.Storage+repo2.RelativePath] = models.Shard{
+ Storage: repo2.Storage,
+ RelativePath: repo2.RelativePath,
+ Primary: mds.storageNodes.m[1],
+ Secondaries: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]},
}
- repo2 := models.Repository{
- RelativePath: cfg.Whitelist[1],
- Storage: cfg.PrimaryServer.Name,
+ for _, repo := range []models.Repository{repo1, repo2} {
+ jobIDs, err := mds.CreateSecondaryReplJobs(repo.Storage, repo.RelativePath)
+ require.NoError(t, err)
+ require.Len(t, jobIDs, 2)
}
- expectSecondaries := []models.GitalyServer{
- models.GitalyServer{Name: cfg.SecondaryServers[0].Name},
- models.GitalyServer{Name: cfg.SecondaryServers[1].Name},
+ expectSecondaries := []models.StorageNode{
+ mds.storageNodes.m[2],
+ mds.storageNodes.m[3],
}
for _, repo := range []models.Repository{repo1, repo2} {
- actualSecondaries, err := mds.GetShardSecondaries(repo)
+ actualSecondaries, err := mds.GetSecondaries(repo.Storage, repo.RelativePath)
require.NoError(t, err)
require.ElementsMatch(t, expectSecondaries, actualSecondaries)
}
- backup1 := cfg.SecondaryServers[0]
- backup2 := cfg.SecondaryServers[1]
+ backup1 := mds.storageNodes.m[2]
+ backup2 := mds.storageNodes.m[3]
- backup1ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
- ID: 1,
- Target: backup1.Name,
- Source: repo1,
- State: praefect.JobStateReady,
+ backup1ExpectedJobs := []ReplJob{
+ ReplJob{
+ ID: 1,
+ TargetNodeID: backup1.ID,
+ Source: models.Repository{RelativePath: repo1.RelativePath},
+ State: JobStatePending,
},
- praefect.ReplJob{
- ID: 3,
- Target: backup1.Name,
- Source: repo2,
- State: praefect.JobStateReady,
+ ReplJob{
+ ID: 3,
+ TargetNodeID: backup1.ID,
+ Source: models.Repository{RelativePath: repo2.RelativePath},
+ State: JobStatePending,
},
}
- backup2ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
- ID: 2,
- Target: backup2.Name,
- Source: repo1,
- State: praefect.JobStateReady,
+ backup2ExpectedJobs := []ReplJob{
+ ReplJob{
+ ID: 2,
+ TargetNodeID: backup2.ID,
+ Source: models.Repository{RelativePath: repo1.RelativePath},
+ State: JobStatePending,
},
- praefect.ReplJob{
- ID: 4,
- Target: backup2.Name,
- Source: repo2,
- State: praefect.JobStateReady,
+ ReplJob{
+ ID: 4,
+ TargetNodeID: backup2.ID,
+ Source: models.Repository{RelativePath: repo2.RelativePath},
+ State: JobStatePending,
},
}
- backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10)
+ backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.ID, 10)
require.NoError(t, err)
require.Equal(t, backup1ExpectedJobs, backup1ActualJobs)
- backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10)
+ backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.ID, 10)
require.NoError(t, err)
require.Equal(t, backup2ActualJobs, backup2ExpectedJobs)
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 417a04be2..743ce192d 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -1,95 +1,104 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
-const (
- stor1 = "default" // usually the primary storage location
- stor2 = "backup-1" // usually the seoncary storage location
+var (
+ stor1 = models.StorageNode{
+ ID: 1,
+ Address: "tcp://address-1",
+ Storage: "praefect-storage-1",
+ }
+ stor2 = models.StorageNode{
+ ID: 2,
+ Address: "tcp://address-2",
+ Storage: "praefect-storage-2",
+ }
proj1 = "abcd1234" // imagine this is a legit project hash
)
var (
- repo1Primary = models.Repository{
+ repo1Shard = models.Shard{
+ Storage: "storage1",
RelativePath: proj1,
- Storage: stor1,
}
)
var operations = []struct {
desc string
- opFn func(*testing.T, praefect.Datastore)
+ opFn func(*testing.T, Datastore)
}{
{
desc: "query an empty datastore",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
},
{
- desc: "insert first replication job before secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- _, err := ds.CreateSecondaryReplJobs(repo1Primary)
- require.Error(t, err, praefect.ErrInvalidReplTarget)
+ desc: "creating replication jobs before secondaries are added results in no jobs added",
+ opFn: func(t *testing.T, ds Datastore) {
+ jobIDs, err := ds.CreateSecondaryReplJobs(repo1Shard.Storage, repo1Shard.RelativePath)
+ require.NoError(t, err)
+ require.Empty(t, jobIDs)
},
},
{
desc: "set the primary for the shard",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1})
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.SetPrimary(repo1Shard.Storage, repo1Shard.RelativePath, stor1.ID)
require.NoError(t, err)
},
},
{
- desc: "associate the replication job target with a primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}})
+ desc: "add a secondary replica for the shard",
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.AddSecondary(repo1Shard.Storage, repo1Shard.RelativePath, stor2.ID)
require.NoError(t, err)
},
},
{
desc: "insert first replication job after secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- ids, err := ds.CreateSecondaryReplJobs(repo1Primary)
+ opFn: func(t *testing.T, ds Datastore) {
+ ids, err := ds.CreateSecondaryReplJobs(repo1Shard.Storage, repo1Shard.RelativePath)
require.NoError(t, err)
require.Equal(t, []uint64{1}, ids)
},
},
{
desc: "fetch inserted replication jobs after primary mapped",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.ID, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- expectedJob := praefect.ReplJob{
- ID: 1,
- Source: repo1Primary,
- Target: stor2,
- State: praefect.JobStatePending,
+ expectedJob := ReplJob{
+ ID: 1,
+ Source: models.Repository{
+ RelativePath: repo1Shard.RelativePath,
+ },
+ TargetNodeID: stor2.ID,
+ State: JobStatePending,
}
require.Equal(t, expectedJob, jobs[0])
},
},
{
desc: "mark replication job done",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.UpdateReplJob(1, praefect.JobStateComplete)
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.UpdateReplJob(1, JobStateComplete)
require.NoError(t, err)
},
},
{
desc: "try fetching completed replication job",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.ID, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
@@ -97,14 +106,15 @@ var operations = []struct {
}
// TODO: add SQL datastore flavor
-var flavors = map[string]func() praefect.Datastore{
- "in-memory-datastore": func() praefect.Datastore {
- return praefect.NewMemoryDatastore(
- config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- })
+var flavors = map[string]func() Datastore{
+ "in-memory-datastore": func() Datastore {
+ ds := NewMemoryDatastore()
+
+ ds.shards.m[repo1Shard.Storage+repo1Shard.RelativePath] = repo1Shard
+ ds.storageNodes.m[stor1.ID] = stor1
+ ds.storageNodes.m[stor2.ID] = stor2
+
+ return ds
},
}
diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go
index b8a8afb01..f6d19f296 100644
--- a/internal/praefect/mock/mock.pb.go
+++ b/internal/praefect/mock/mock.pb.go
@@ -9,7 +9,10 @@ import (
math "math"
proto "github.com/golang/protobuf/proto"
+ _ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
)
// Reference imports to suppress errors if they are not otherwise used.
@@ -109,16 +112,17 @@ func init() {
func init() { proto.RegisterFile("mock/mock.proto", fileDescriptor_5ed43251284e3118) }
var fileDescriptor_5ed43251284e3118 = []byte{
- // 139 bytes of a gzipped FileDescriptorProto
+ // 157 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xcd, 0x4f, 0xce,
- 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x92, 0x2a, 0x17,
- 0x6f, 0x70, 0x66, 0x6e, 0x41, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x08,
- 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x84, 0xa3,
- 0xa4, 0xc6, 0xc5, 0x07, 0x53, 0x56, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0x50, 0xc7, 0x84, 0xa4,
- 0xce, 0x28, 0x00, 0x66, 0x5c, 0x70, 0x6a, 0x51, 0x59, 0x66, 0x72, 0xaa, 0x90, 0x3d, 0x97, 0x00,
- 0x44, 0x20, 0x34, 0x2f, 0xb1, 0xa8, 0x12, 0x4c, 0x08, 0x09, 0xeb, 0x81, 0x9d, 0x81, 0x62, 0xaf,
- 0x94, 0x08, 0xaa, 0x20, 0xc4, 0x16, 0x25, 0x86, 0x24, 0x36, 0xb0, 0x6b, 0x8d, 0x01, 0x01, 0x00,
- 0x00, 0xff, 0xff, 0xb7, 0xeb, 0x46, 0xfb, 0xc0, 0x00, 0x00, 0x00,
+ 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x14, 0x4f, 0x71,
+ 0x46, 0x62, 0x51, 0x6a, 0x0a, 0x44, 0x4c, 0x49, 0x95, 0x8b, 0x37, 0x38, 0x33, 0xb7, 0x20, 0x27,
+ 0x35, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34,
+ 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x35, 0x08, 0xc2, 0x51, 0x52, 0xe3, 0xe2, 0x83, 0x29, 0x2b,
+ 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x45, 0xa8, 0x63, 0x42, 0x52, 0x67, 0x14, 0x01, 0x33, 0x2e, 0x38,
+ 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0xc8, 0x9d, 0x4b, 0x00, 0x22, 0x10, 0x9a, 0x97, 0x58, 0x54,
+ 0x09, 0x26, 0x84, 0x84, 0xf5, 0xc0, 0x8e, 0x42, 0xb1, 0x57, 0x4a, 0x04, 0x55, 0x10, 0x62, 0x8b,
+ 0x12, 0xc7, 0xaf, 0xe9, 0x1a, 0x2c, 0x1c, 0x4c, 0x02, 0x8c, 0x49, 0x6c, 0x60, 0xf7, 0x1a, 0x03,
+ 0x02, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x14, 0x6a, 0x14, 0xd6, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -160,6 +164,14 @@ type SimpleServiceServer interface {
SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error)
}
+// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations.
+type UnimplementedSimpleServiceServer struct {
+}
+
+func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented")
+}
+
func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) {
s.RegisterService(&_SimpleService_serviceDesc, srv)
}
diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto
index aa6ec842a..59e79d3b9 100644
--- a/internal/praefect/mock/mock.proto
+++ b/internal/praefect/mock/mock.proto
@@ -7,6 +7,8 @@ syntax = "proto3";
package mock;
+import "shared.proto";
+
message SimpleRequest {
int32 value = 1;
}
@@ -17,7 +19,10 @@ message SimpleResponse {
service SimpleService {
// SimpleUnaryUnary is a simple unary request with unary response
rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) {
- option (gitaly.op_type).op = ACCESSOR;
+ option (gitaly.op_type) = {
+ op: ACCESSOR
+ scope_level: SERVER
+ };
}
}
diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go
index f6e01811b..adcf7a65e 100644
--- a/internal/praefect/mocksvc_test.go
+++ b/internal/praefect/mocksvc_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
@@ -11,7 +11,7 @@ type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock.
// mockSvc is an implementation of mock.SimpleServer for testing purposes. The
// gRPC stub can be updated via go generate:
//
-//go:generate protoc --go_out=plugins=grpc:. mock/mock.proto
+//go:generate protoc --go_out=plugins=grpc:. -I../../proto -I./ mock/mock.proto
//go:generate goimports -w mock/mock.pb.go
type mockSvc struct {
simpleUnaryUnary simpleUnaryUnaryCallback
diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go
new file mode 100644
index 000000000..0e1079441
--- /dev/null
+++ b/internal/praefect/models/node.go
@@ -0,0 +1,18 @@
+package models
+
+// StorageNode describes an address that serves a storage
+type StorageNode struct {
+ ID int
+ Storage string `toml:"storage"`
+ Address string `toml:"address"`
+ Token string `toml:"token"`
+}
+
+// Shard describes a repository's relative path and its primary and list of secondaries
+type Shard struct {
+ ID int
+ Storage string
+ RelativePath string
+ Primary StorageNode
+ Secondaries []StorageNode
+}
diff --git a/internal/praefect/models/nodes.go b/internal/praefect/models/nodes.go
deleted file mode 100644
index 854254d87..000000000
--- a/internal/praefect/models/nodes.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package models
-
-// GitalyServer allows configuring the servers that RPCs are proxied to
-type GitalyServer struct {
- Name string `toml:"name"`
- ListenAddr string `toml:"listen_addr" split_words:"true"`
- Token string `toml:"token"`
-}
diff --git a/internal/praefect/protoregistry/protoregistry.go b/internal/praefect/protoregistry/protoregistry.go
index 909b2e2b2..9817904dc 100644
--- a/internal/praefect/protoregistry/protoregistry.go
+++ b/internal/praefect/protoregistry/protoregistry.go
@@ -43,11 +43,24 @@ const (
OpMutator
)
+// Scope represents the scope for an RPC method
+type Scope int
+
+const (
+ // ScopeRepository = repository scope
+ ScopeRepository = iota
+ // ScopeStorage = storage scope
+ ScopeStorage
+ // ScopeServer = serer scope
+ ScopeServer
+)
+
// MethodInfo contains metadata about the RPC method. Refer to documentation
// for message type "OperationMsg" shared.proto in gitlab-org/gitaly-proto for
// more documentation.
type MethodInfo struct {
Operation OpType
+ Scope Scope
targetRepo []int
requestName string // protobuf message name for input type
requestFactory protoFactory
@@ -55,13 +68,6 @@ type MethodInfo struct {
// TargetRepo returns the target repository for a protobuf message if it exists
func (mi MethodInfo) TargetRepo(msg proto.Message) (*gitalypb.Repository, error) {
- if mi.requestName != proto.MessageName(msg) {
- return nil, fmt.Errorf(
- "proto message %s does not match expected RPC request message %s",
- proto.MessageName(msg), mi.requestName,
- )
- }
-
return reflectFindRepoTarget(msg, mi.targetRepo)
}
@@ -179,6 +185,11 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo,
return MethodInfo{}, err
}
+ scope, err := parseScope(opMsg.GetScopeLevel())
+ if err != nil {
+ return MethodInfo{}, err
+ }
+
// for some reason, the protobuf descriptor contains an extra dot in front
// of the request name that the generated code does not. This trimming keeps
// the two copies consistent for comparisons.
@@ -194,9 +205,21 @@ func parseMethodInfo(methodDesc *descriptor.MethodDescriptorProto) (MethodInfo,
targetRepo: targetRepo,
requestName: requestName,
requestFactory: reqFactory,
+ Scope: scope,
}, nil
}
+func parseScope(scope gitalypb.OperationMsg_Scope) (Scope, error) {
+ switch scope {
+ case gitalypb.OperationMsg_REPOSITORY:
+ return ScopeRepository, nil
+ case gitalypb.OperationMsg_SERVER:
+ return ScopeServer, nil
+ }
+
+ return ScopeRepository, errors.New("scope not found")
+}
+
// parses a string like "1.1" and returns a slice of ints
func parseOID(rawFieldOID string) ([]int, error) {
var fieldNos []int
diff --git a/internal/praefect/protoregistry/targetrepo_test.go b/internal/praefect/protoregistry/targetrepo_test.go
index f2c1f394e..7112d99c0 100644
--- a/internal/praefect/protoregistry/targetrepo_test.go
+++ b/internal/praefect/protoregistry/targetrepo_test.go
@@ -56,7 +56,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) {
svc: "RepositoryService",
method: "RepackIncremental",
pbMsg: &gitalypb.RepackIncrementalResponse{},
- expectErr: errors.New("proto message gitaly.RepackIncrementalResponse does not match expected RPC request message gitaly.RepackIncrementalRequest"),
+ expectErr: errors.New("unable to descend OID [1] into message gitaly.RepackIncrementalResponse: unable to find protobuf field 1 in message RepackIncrementalResponse"),
},
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index d3244619d..66b28b7bf 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -8,22 +8,23 @@ import (
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/internal/helper"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "google.golang.org/grpc"
"github.com/sirupsen/logrus"
)
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, source models.Repository, target Node) error
+ Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Logger
}
-func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error {
repository := &gitalypb.Repository{
- StorageName: target.Storage,
+ StorageName: targetStorage,
RelativePath: source.RelativePath,
}
@@ -32,8 +33,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit
RelativePath: source.RelativePath,
}
- repositoryClient := gitalypb.NewRepositoryServiceClient(target.cc)
- remoteClient := gitalypb.NewRemoteServiceClient(target.cc)
+ repositoryClient := gitalypb.NewRepositoryServiceClient(target)
+ remoteClient := gitalypb.NewRemoteServiceClient(target)
// CreateRepository is idempotent
if _, err := repositoryClient.CreateRepository(ctx, &gitalypb.CreateRepositoryRequest{
@@ -60,7 +61,8 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source models.Reposit
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Logger
- dataStore Datastore
+ replicasDS ReplicasDatastore
+ replJobsDS ReplJobsDatastore
coordinator *Coordinator
targetNode string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
@@ -74,10 +76,11 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(targetNode string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(targetNode string, log *logrus.Logger, replicasDS ReplicasDatastore, jobsDS ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
- dataStore: ds,
+ replicasDS: replicasDS,
+ replJobsDS: jobsDS,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
targetNode: targetNode,
@@ -118,7 +121,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
return nil
}
- id, err := r.dataStore.CreateSecondaryReplJobs(repo)
+ id, err := r.replJobsDS.CreateSecondaryReplJobs(repo.Storage, repo.RelativePath)
if err != nil {
return err
}
@@ -140,58 +143,63 @@ const (
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
for {
- jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10)
+ nodes, err := r.replicasDS.GetStorageNodes()
if err != nil {
- return err
+ return nil
}
- if len(jobs) == 0 {
- r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval)
-
- select {
- // TODO: exponential backoff when no queries are returned
- case <-time.After(jobFetchInterval):
- continue
-
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-
- r.log.Debugf("fetched replication jobs: %#v", jobs)
-
- for _, job := range jobs {
- r.log.WithField(logWithReplJobID, job.ID).
- Infof("processing replication job %#v", job)
- node, err := r.coordinator.GetStorageNode(job.Target)
- r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err)
+ for _, node := range nodes {
+ jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.ID, 10)
if err != nil {
return err
}
- if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
- return err
- }
+ if len(jobs) == 0 {
+ r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval)
- primary, err := r.dataStore.GetShardPrimary(job.Source)
- if err != nil {
- return err
- }
-
- ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, primary.ListenAddr, primary.Token)
- if err != nil {
- return err
- }
+ select {
+ // TODO: exponential backoff when no queries are returned
+ case <-time.After(jobFetchInterval):
+ continue
- if err := r.replicator.Replicate(ctx, job.Source, node); err != nil {
- r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
- return err
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
- r.log.WithField(logWithReplJobID, job.ID).
- Info("completed replication")
- if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
- return err
+ r.log.WithField("node", node).Debugf("fetched replication jobs: %#v", jobs)
+
+ for _, job := range jobs {
+ r.log.WithField(logWithReplJobID, job.ID).
+ Infof("processing replication job %#v", job)
+ node, err := r.replicasDS.GetStorageNode(job.TargetNodeID)
+ if err != nil {
+ return err
+ }
+ r.log.WithField(logWithReplJobID, job.ID).WithField("storage", node).Info("got storage")
+
+ if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ return err
+ }
+
+ ctx, err = helper.InjectGitalyServers(ctx, "default", node.Address, "")
+ if err != nil {
+ return err
+ }
+
+ cc, err := r.coordinator.GetConnection(node.Address)
+ if err != nil {
+ return err
+ }
+
+ if err := r.replicator.Replicate(ctx, job.Source, node.Storage, cc); err != nil {
+ r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
+ return err
+ }
+
+ if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ return err
+ }
}
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 1294bc989..684b2f61a 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -18,7 +18,6 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
@@ -28,43 +27,55 @@ import (
// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for
// all whitelisted repos
func TestReplicatorProcessJobsWhitelist(t *testing.T) {
- var (
- cfg = config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup1",
- ListenAddr: "tcp://gitaly-backup1.example.com",
- },
- {
- Name: "backup2",
- ListenAddr: "tcp://gitaly-backup2.example.com",
- },
- },
- Whitelist: []string{"abcd1234", "edfg5678"},
- }
- datastore = NewMemoryDatastore(cfg)
- coordinator = NewCoordinator(logrus.New(), datastore)
- resultsCh = make(chan result)
- replman = NewReplMgr(
- cfg.SecondaryServers[1].Name,
- logrus.New(),
- datastore,
- coordinator,
- WithWhitelist(cfg.Whitelist),
- WithReplicator(&mockReplicator{resultsCh}),
- )
+ datastore := NewMemoryDatastore()
+ datastore.storageNodes.m[1] = models.StorageNode{
+ ID: 1,
+ Address: "tcp://gitaly-primary.example.com",
+ Storage: "praefect-internal-1",
+ }
+ datastore.storageNodes.m[2] = models.StorageNode{
+ ID: 2,
+ Address: "tcp://gitaly-backup1.example.com",
+ Storage: "praefect-internal-2",
+ }
+ datastore.storageNodes.m[3] = models.StorageNode{
+ ID: 3,
+ Address: "tcp://gitaly-backup2.example.com",
+ Storage: "praefect-internal-3",
+ }
+
+ datastore.shards.m["default"+"abcd1234"] = models.Shard{
+ Storage: "default",
+ RelativePath: "abcd1234",
+ Primary: datastore.storageNodes.m[1],
+ Secondaries: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]},
+ }
+ datastore.shards.m["default"+"edfg5678"] = models.Shard{
+ Storage: "default",
+ RelativePath: "edfg5678",
+ Primary: datastore.storageNodes.m[1],
+ Secondaries: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]},
+ }
+
+ for _, repo := range []string{"abcd1234", "edfg5678"} {
+ jobIDs, err := datastore.CreateSecondaryReplJobs("default", repo)
+ require.NoError(t, err)
+ require.Len(t, jobIDs, 2)
+ }
+
+ coordinator := NewCoordinator(logrus.New(), datastore)
+ resultsCh := make(chan result)
+ replman := NewReplMgr(
+ "default",
+ logrus.New(),
+ datastore,
+ datastore,
+ coordinator,
+ WithReplicator(&mockReplicator{resultsCh}),
)
- for _, node := range []*models.GitalyServer{
- cfg.PrimaryServer,
- cfg.SecondaryServers[0],
- cfg.SecondaryServers[1],
- } {
- err := coordinator.RegisterNode(node.Name, node.ListenAddr)
+ for _, node := range datastore.storageNodes.m {
+ err := coordinator.RegisterNode(node.Address)
require.NoError(t, err)
}
@@ -78,14 +89,27 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
success := make(chan struct{})
+ var expectedResults []result
+ // we expect one job per whitelisted repo with each backend server
+ for _, shard := range datastore.shards.m {
+ for _, secondary := range shard.Secondaries {
+ cc, err := coordinator.GetConnection(secondary.Address)
+ require.NoError(t, err)
+ expectedResults = append(expectedResults,
+ result{source: models.Repository{RelativePath: shard.RelativePath},
+ targetStorage: secondary.Storage,
+ targetCC: cc,
+ })
+ }
+ }
+
go func() {
// we expect one job per whitelisted repo with each backend server
- for i := 0; i < len(cfg.Whitelist); i++ {
- result := <-resultsCh
-
- assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
- assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage)
- assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage)
+ for _, shard := range datastore.shards.m {
+ for range shard.Secondaries {
+ result := <-resultsCh
+ assert.Contains(t, expectedResults, result)
+ }
}
cancel()
@@ -106,18 +130,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
}
type result struct {
- source models.Repository
- target Node
+ source models.Repository
+ targetStorage string
+ targetCC *grpc.ClientConn
}
type mockReplicator struct {
resultsCh chan<- result
}
-func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error {
select {
- case mr.resultsCh <- result{source, target}:
+ case mr.resultsCh <- result{source, targetStorage, target}:
return nil
case <-ctx.Done():
@@ -179,10 +204,9 @@ func TestReplicate(t *testing.T) {
require.NoError(t, replicator.Replicate(
ctx,
models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()},
- Node{
- cc: conn,
- Storage: backupStorageName,
- }))
+ backupStorageName,
+ conn,
+ ))
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
testhelper.MustRunCommand(t, nil, "git", "-C", replicatedPath, "cat-file", "-e", commitID)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 915d7281a..282c61051 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
@@ -7,14 +7,14 @@ import (
"testing"
"time"
+ "github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"google.golang.org/grpc"
)
@@ -44,26 +44,47 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
},
}
+ gz := proto.FileDescriptor("mock/mock.proto")
+ fd, err := protoregistry.ExtractFileDescriptor(gz)
+ if err != nil {
+ panic(err)
+ }
+
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
const (
storagePrimary = "default"
- storageBackup = "backup"
)
- datastore := praefect.NewMemoryDatastore(config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- })
- coordinator := praefect.NewCoordinator(logrus.New(), datastore)
- replmgr := praefect.NewReplMgr(
+ datastore := NewMemoryDatastore()
+ datastore.storageNodes.m[1] = models.StorageNode{
+ ID: 1,
+ Storage: "praefect-internal-1",
+ }
+ datastore.storageNodes.m[2] = models.StorageNode{
+ ID: 2,
+ Storage: "praefect-internal-2",
+ }
+
+ coordinator := NewCoordinator(logrus.New(), datastore, fd)
+
+ for id, nodeStorage := range datastore.storageNodes.m {
+ backend, cleanup := newMockDownstream(t, tt.callback)
+ defer cleanup() // clean up mock downstream server resources
+
+ coordinator.RegisterNode(backend)
+ nodeStorage.Address = backend
+ datastore.storageNodes.m[id] = nodeStorage
+ }
+
+ replmgr := NewReplMgr(
storagePrimary,
logrus.New(),
datastore,
+ datastore,
coordinator,
)
- prf := praefect.NewServer(
+ prf := NewServer(
coordinator,
replmgr,
nil,
@@ -85,13 +106,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
defer cc.Close()
cli := mock.NewSimpleServiceClient(cc)
- for _, replica := range []string{storagePrimary, storageBackup} {
- backend, cleanup := newMockDownstream(t, tt.callback)
- defer cleanup() // clean up mock downstream server resources
-
- coordinator.RegisterNode(replica, backend)
- }
-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()