Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-07-29 23:37:12 +0300
committerJohn Cai <jcai@gitlab.com>2019-07-31 03:16:43 +0300
commit96b3d426161a04297fa92045467d51291aaed751 (patch)
tree4004251692d16733bfa4a32225e44141192e941f
parent4cb8fc3e6175f4dce685f9183b2a6403162ee5c1 (diff)
Refactor per mr commentsjc-sql-data-store
-rw-r--r--NOTICE53
-rw-r--r--_support/praefect-cluster/config.praefect.toml16
-rw-r--r--cmd/praefect/main.go28
-rw-r--r--go.mod13
-rw-r--r--go.sum26
-rw-r--r--internal/helper/storage.go7
-rw-r--r--internal/praefect/config/config.go15
-rw-r--r--internal/praefect/config/config_test.go21
-rw-r--r--internal/praefect/config/testdata/config.toml5
-rw-r--r--internal/praefect/coordinator.go20
-rw-r--r--internal/praefect/database/migrations/1_initial_up.sql13
-rw-r--r--internal/praefect/database/sql_datastore.go123
-rw-r--r--internal/praefect/datastore.go136
-rw-r--r--internal/praefect/datastore_memory_test.go58
-rw-r--r--internal/praefect/datastore_test.go20
-rw-r--r--internal/praefect/models/node.go7
-rw-r--r--internal/praefect/models/repository.go8
-rw-r--r--internal/praefect/replicator.go12
-rw-r--r--internal/praefect/replicator_test.go29
-rw-r--r--internal/praefect/server_test.go2
20 files changed, 352 insertions, 260 deletions
diff --git a/NOTICE b/NOTICE
index dffa2f1bf..9c6f4d540 100644
--- a/NOTICE
+++ b/NOTICE
@@ -2887,6 +2887,59 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/crypto
+Copyright (c) 2009 The Go Authors. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+PATENTS - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/crypto
+Additional IP Rights Grant (Patents)
+
+"This implementation" means the copyrightable works distributed by
+Google as part of the Go project.
+
+Google hereby grants to You a perpetual, worldwide, non-exclusive,
+no-charge, royalty-free, irrevocable (except as stated in this section)
+patent license to make, have made, use, offer to sell, sell, import,
+transfer and otherwise run, modify and propagate the contents of this
+implementation of Go, where such license applies only to those patent
+claims, both currently owned or controlled by Google and acquired in
+the future, licensable by Google that are necessarily infringed by this
+implementation of Go. This grant does not include claims that would be
+infringed only as a consequence of further modification of this
+implementation. If you or your agent or exclusive licensee institute or
+order or agree to the institution of patent litigation against any
+entity (including a cross-claim or counterclaim in a lawsuit) alleging
+that this implementation of Go or any code incorporated within this
+implementation of Go constitutes direct or contributory patent
+infringement, or inducement of patent infringement, then any patent
+rights granted to you under this License for this implementation of Go
+shall terminate as of the date such litigation is filed.
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/net
Copyright (c) 2009 The Go Authors. All rights reserved.
diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml
index 750faac60..2a4297248 100644
--- a/_support/praefect-cluster/config.praefect.toml
+++ b/_support/praefect-cluster/config.praefect.toml
@@ -24,14 +24,22 @@ whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90e
# as shard. listen_addr should be unique for all nodes.
# Requires the protocol to be defined, e.g. tcp://host.tld:1234
-[[node]]
+[[storage_node]]
# listen_addr = "tcp://gitaly-primary:9999"
+ storage = "praefect-internal-1"
address = "tcp://127.0.0.1:9999"
-[[node]]
+[[storage_node]]
# listen_addr = "tcp://gitaly-backup-1:9999"
+ storage = "praefect-internal-2"
address = "tcp://127.0.0.1:9998"
-[[node]]
+[[storage_node]]
# listen_addr = "tcp://gitaly-backup-2:9999"
- address = "tcp://127.0.0.1:9997" \ No newline at end of file
+ storage = "praefect-internal-3"
+ address = "tcp://127.0.0.1:9997"
+
+[postgres]
+ user = "johncai"
+ address = "127.0.0.1:5432"
+ database = "praefect_test" \ No newline at end of file
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index aaf5e1f20..d4ac47f18 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -95,10 +95,11 @@ func configure() (config.Config, error) {
func run(listeners []net.Listener, conf config.Config) error {
sqlDatastore, err := database.NewSQLDatastore(
- os.Getenv("PRAEFECT_PG_USER"),
- os.Getenv("PRAEFECT_PG_PASSWORD"),
- os.Getenv("PRAEFECT_PG_ADDRESS"),
- os.Getenv("PRAEFECT_PG_DATABASE"))
+ conf.Postgres.User,
+ conf.Postgres.Password,
+ conf.Postgres.Address,
+ conf.Postgres.Database,
+ )
if err != nil {
return fmt.Errorf("failed to create sql datastore: %v", err)
@@ -116,6 +117,10 @@ func run(listeners []net.Listener, conf config.Config) error {
serverErrors = make(chan error, 1)
)
+ if err := sqlDatastore.LoadFromConfig(conf); err != nil {
+ return fmt.Errorf("loading config for database: %v", err)
+ }
+
signal.Notify(termCh, signals...)
for _, l := range listeners {
@@ -125,22 +130,11 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- nodes, err := sqlDatastore.GetStorageNodes()
- if err != nil {
- return fmt.Errorf("failed to get storage nodes from database: %v", err)
- }
-
- addresses := make(map[string]struct{})
- for _, node := range nodes {
- if _, ok := addresses[node.Address]; ok {
- continue
- }
- if err := coordinator.RegisterNode(node.Address); err != nil {
+ for _, node := range conf.StorageNodes {
+ if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil {
return fmt.Errorf("failed to register %s: %s", node.Address, err)
}
- addresses[node.Address] = struct{}{}
-
logger.WithField("node_address", node.Address).Info("registered gitaly node")
}
diff --git a/go.mod b/go.mod
index 1a1ac2d4b..de8b013fd 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/cloudflare/tableflip v0.0.0-20190329062924-8392f1641731
github.com/getsentry/raven-go v0.1.2
- github.com/golang/protobuf v1.3.2
+ github.com/golang/protobuf v1.3.1
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
@@ -12,17 +12,18 @@ require (
github.com/kr/pretty v0.1.0 // indirect
github.com/lib/pq v1.2.0
github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47
- github.com/onsi/ginkgo v1.8.0 // indirect
- github.com/onsi/gomega v1.5.0 // indirect
github.com/prometheus/client_golang v1.0.0
- github.com/sirupsen/logrus v1.4.1
+ github.com/sirupsen/logrus v1.2.0
github.com/stretchr/testify v1.3.0
github.com/tinylib/msgp v1.1.0 // indirect
- gitlab.com/gitlab-org/gitaly-proto v1.38.0
+ gitlab.com/gitlab-org/gitaly-proto v1.37.0
gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c
+ golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 // indirect
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980
- golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
+ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sys v0.0.0-20190422165155-953cdadca894
+ google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect
google.golang.org/grpc v1.16.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
+ gopkg.in/yaml.v2 v2.2.2 // indirect
)
diff --git a/go.sum b/go.sum
index eb5db3227..91aab83da 100644
--- a/go.sum
+++ b/go.sum
@@ -35,8 +35,6 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
-github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
-github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
@@ -72,12 +70,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
-github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
-github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
-github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
@@ -101,15 +95,12 @@ github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNG
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
-github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
-github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
-github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU=
github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
@@ -119,8 +110,8 @@ github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1
github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo=
github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
-gitlab.com/gitlab-org/gitaly-proto v1.38.0 h1:46jlky1yhAC+WfYA9F/0N328QTYMBxOc1/AAwD7rQs8=
-gitlab.com/gitlab-org/gitaly-proto v1.38.0/go.mod h1:zNjk/86bjwLVJ4NcvInBcXcLdptdRFQ28sYrdFbrFgY=
+gitlab.com/gitlab-org/gitaly-proto v1.37.0 h1:cRQXF3kW+AR1eLIYTWfb+Eqa+Wd0PIcTq4FkaSCWvK4=
+gitlab.com/gitlab-org/gitaly-proto v1.37.0/go.mod h1:zNjk/86bjwLVJ4NcvInBcXcLdptdRFQ28sYrdFbrFgY=
gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c h1:xo48LcGsTCasKcJpQDBCCuZU+aP8uGaboUVvD7Lgm6g=
gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c/go.mod h1:rYhLgfrbEcyfinG+R3EvKu6bZSsmwQqcXzLfHWSfUKM=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
@@ -128,10 +119,14 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc=
+golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
@@ -141,14 +136,13 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI=
-golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
@@ -156,8 +150,9 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
-google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 h1:yvw+zsSmSM02Z5H3ZdEV7B7Ql7eFrjQTnmByJvK+3J8=
+google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg=
google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
gopkg.in/DataDog/dd-trace-go.v1 v1.7.0 h1:7wbMayb6JXcbAS95RN7MI42W3o1BCxCcdIzZfVWBAiE=
@@ -171,6 +166,7 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
-gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git a/internal/helper/storage.go b/internal/helper/storage.go
index a76b50f52..f3f0d0ba0 100644
--- a/internal/helper/storage.go
+++ b/internal/helper/storage.go
@@ -41,12 +41,7 @@ func IncomingToOutgoing(ctx context.Context) context.Context {
return ctx
}
- gitalyServersJSONEncoded := md["gitaly-servers"]
- if len(gitalyServersJSONEncoded) == 0 {
- return ctx
- }
-
- return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", gitalyServersJSONEncoded[0]))
+ return metadata.NewOutgoingContext(ctx, md)
}
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index eb0fad56b..eb044d308 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -23,6 +23,16 @@ type Config struct {
Logging config.Logging `toml:"logging"`
PrometheusListenAddr string `toml:"prometheus_listen_addr"`
+
+ Postgres *Postgres `toml:"postgres"`
+}
+
+// Postgres contains details for connecting to a postgres database
+type Postgres struct {
+ User string `toml:"user"`
+ Password string `toml:"password"`
+ Address string `toml:"address"`
+ Database string `toml:"database"`
}
// FromFile loads the config for the passed file path
@@ -43,6 +53,7 @@ var (
errNoGitalyServers = errors.New("no primary gitaly backends configured")
errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique")
errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address")
+ errNoPostgres = errors.New("postgres configuration missing")
)
// Validate establishes if the config is valid
@@ -68,5 +79,9 @@ func (c Config) Validate() error {
listenAddrs[node.Address] = true
}
+ if c.Postgres == nil {
+ return errNoPostgres
+ }
+
return nil
}
diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go
index b89bdd648..c6de879fc 100644
--- a/internal/praefect/config/config_test.go
+++ b/internal/praefect/config/config_test.go
@@ -22,27 +22,32 @@ func TestConfigValidation(t *testing.T) {
}{
{
desc: "No ListenAddr or SocketPath",
- config: Config{ListenAddr: "", StorageNodes: nodes},
+ config: Config{ListenAddr: "", StorageNodes: nodes, Postgres: &Postgres{}},
err: errNoListener,
},
{
desc: "Only a SocketPath",
- config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes},
+ config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes, Postgres: &Postgres{}},
err: nil,
},
{
desc: "No servers",
- config: Config{ListenAddr: "localhost:1234"},
+ config: Config{ListenAddr: "localhost:1234", Postgres: &Postgres{}},
err: errNoGitalyServers,
},
{
desc: "duplicate address",
- config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address})},
+ config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address}), Postgres: &Postgres{}},
err: errDuplicateGitalyAddr,
},
{
- desc: "Valid config",
+ desc: "Missing Postgres",
config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes},
+ err: errNoPostgres,
+ },
+ {
+ desc: "Valid config",
+ config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes, Postgres: &Postgres{}},
err: nil,
},
}
@@ -78,6 +83,12 @@ func TestConfigParsing(t *testing.T) {
},
},
Whitelist: []string{"abcd1234", "edfg5678"},
+ Postgres: &Postgres{
+ User: "pg_user",
+ Password: "password",
+ Address: "/tmp/postgres.socket",
+ Database: "praefect_test",
+ },
},
},
}
diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml
index 247db51a9..4464c5663 100644
--- a/internal/praefect/config/testdata/config.toml
+++ b/internal/praefect/config/testdata/config.toml
@@ -21,3 +21,8 @@ prometheus_listen_addr = ""
address = "tcp://gitaly-internal-3.example.com"
storage = "praefect-internal-3"
+[postgres]
+ user = "pg_user"
+ password = "password"
+ address = "/tmp/postgres.socket"
+ database = "praefect_test"
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 86a254b5a..053e178cb 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -97,7 +97,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
return nil, nil, err
}
- primary, err = c.datastore.GetPrimary(targetRepo.GetStorageName(), targetRepo.GetRelativePath())
+ primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
if err != nil {
if err != sql.ErrNoRows {
@@ -117,7 +117,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
newPrimary := nodes[0]
// set the primary
- if err = c.datastore.SetPrimary(targetRepo.GetStorageName(), targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
+ if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
return nil, nil, err
}
@@ -138,9 +138,9 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, err := c.GetConnection(primary.Address)
+ cc, err := c.GetConnection(primary.Storage)
if err != nil {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Address)
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage)
}
return helper.IncomingToOutgoing(ctx), cc, nil
@@ -148,7 +148,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-func (c *Coordinator) RegisterNode(address string) error {
+func (c *Coordinator) RegisterNode(storage, address string) error {
conn, err := client.Dial(address,
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
@@ -159,21 +159,21 @@ func (c *Coordinator) RegisterNode(address string) error {
return err
}
- c.setConn(address, conn)
+ c.setConn(storage, conn)
return nil
}
-func (c *Coordinator) setConn(address string, conn *grpc.ClientConn) {
+func (c *Coordinator) setConn(storage string, conn *grpc.ClientConn) {
c.connMutex.Lock()
- c.nodes[address] = conn
+ c.nodes[storage] = conn
c.connMutex.Unlock()
}
// GetConnection gets the grpc client connection based on an address
-func (c *Coordinator) GetConnection(address string) (*grpc.ClientConn, error) {
+func (c *Coordinator) GetConnection(storage string) (*grpc.ClientConn, error) {
c.connMutex.RLock()
- cc, ok := c.nodes[address]
+ cc, ok := c.nodes[storage]
c.connMutex.RUnlock()
if !ok {
return nil, errors.New("client connection not found")
diff --git a/internal/praefect/database/migrations/1_initial_up.sql b/internal/praefect/database/migrations/1_initial_up.sql
index 774d4ff42..9a9437a62 100644
--- a/internal/praefect/database/migrations/1_initial_up.sql
+++ b/internal/praefect/database/migrations/1_initial_up.sql
@@ -1,19 +1,16 @@
CREATE TABLE IF NOT EXISTS storage_nodes (
id SERIAL PRIMARY KEY,
- storage TEXT NOT NULL,
- address TEXT NOT NULL
+ storage TEXT NOT NULL
);
-CREATE TABLE IF NOT EXISTS shards (
+CREATE TABLE IF NOT EXISTS repositories (
id SERIAL PRIMARY KEY,
- storage TEXT NOT NULL,
relative_path TEXT NOT NULL,
"primary" INTEGER REFERENCES storage_nodes (id),
- UNIQUE (storage, relative_path)
);
-CREATE TABLE IF NOT EXISTS shard_secondaries (
- shard_id INTEGER REFERENCES shards(id),
+CREATE TABLE IF NOT EXISTS repository_replicas (
+ repository_id INTEGER REFERENCES repositories(id),
storage_node_id INTEGER REFERENCES storage_nodes(id),
- PRIMARY KEY(shard_id, storage_node_id)
+ PRIMARY KEY(repository_id, storage_node_id)
); \ No newline at end of file
diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/database/sql_datastore.go
index bd64a2ef9..a6cc902a6 100644
--- a/internal/praefect/database/sql_datastore.go
+++ b/internal/praefect/database/sql_datastore.go
@@ -3,12 +3,14 @@ package database
import (
"errors"
"fmt"
+ "strings"
"database/sql"
// the lib/pg package provides postgres bindings for the sql package
_ "github.com/lib/pq"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
@@ -28,14 +30,35 @@ func NewSQLDatastore(user, password, address, database string) (*SQLDatastore, e
return &SQLDatastore{db: db}, nil
}
-// GetSecondaries gets the secondaries for a shard based on the relative path
-func (sd *SQLDatastore) GetSecondaries(storage, relativePath string) ([]models.StorageNode, error) {
- var secondaries []models.StorageNode
+// LoadFromConfig loads the config into the database
+func (sd *SQLDatastore) LoadFromConfig(cfg config.Config) error {
+ _, err := sd.db.Exec(insertStorageNodesQuery(cfg.StorageNodes))
+ if err != nil {
+ return fmt.Errorf("loading StorageNodes: %v", err)
+ }
+
+ return nil
+}
+
+func insertStorageNodesQuery(storageNodes []*models.StorageNode) string {
+ q := `INSERT INTO storage_nodes (storage) VALUES %s ON CONFLICT (storage) DO NOTHING`
+
+ var values []string
+ for _, storageNode := range storageNodes {
+ values = append(values, fmt.Sprintf(`('%s')`, storageNode.Storage))
+ }
+
+ return fmt.Sprintf(q, strings.Join(values, ","))
+}
+
+// GetReplicas gets the replicas for a repository based on the relative path
+func (sd *SQLDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) {
+ var replicas []models.StorageNode
rows, err := sd.db.Query(`
- SELECT storage_nodes.id, storage_nodes.address FROM shards
- INNER JOIN shard_secondaries ON shards.id = shard_secondaries.shard_id
- INNER JOIN storage_nodes ON storage_nodes.id = shard_secondaries.storage_node_id WHERE shards.storage = $1 AND shards.relative_path = $2`, storage, relativePath)
+ SELECT storage_nodes.id FROM repositories
+ INNER JOIN repository_replicas ON repositories.id = repository_replicas.repository_id
+ INNER JOIN storage_nodes ON storage_nodes.id = repository_replicas.storage_node_id WHERE repositories.relative_path = $1`, relativePath)
if err != nil {
return nil, err
@@ -43,23 +66,23 @@ func (sd *SQLDatastore) GetSecondaries(storage, relativePath string) ([]models.S
for rows.Next() {
var s models.StorageNode
- err = rows.Scan(&s.ID, &s.Address)
+ err = rows.Scan(&s.ID)
if err != nil {
return nil, err
}
- secondaries = append(secondaries, s)
+ replicas = append(replicas, s)
}
- return secondaries, nil
+ return replicas, nil
}
// GetStorageNode gets all storage storage_nodes
func (sd *SQLDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) {
var node models.StorageNode
- row := sd.db.QueryRow("SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM storage_nodes WHERE storage_nodes.id = $1", nodeID)
+ row := sd.db.QueryRow("SELECT storage_nodes.id, storage_nodes.storage FROM storage_nodes WHERE storage_nodes.id = $1", nodeID)
- err := row.Scan(&node.ID, &node.Address, &node.Storage)
+ err := row.Scan(&node.ID, &node.Storage)
if err != nil {
return node, err
}
@@ -72,7 +95,7 @@ func (sd *SQLDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) {
func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) {
var nodeStorages []models.StorageNode
- rows, err := sd.db.Query("SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM storage_nodes")
+ rows, err := sd.db.Query("SELECT storage_nodes.id, storage_nodes.storage FROM storage_nodes")
if err != nil {
return nil, err
@@ -80,7 +103,7 @@ func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) {
for rows.Next() {
var nodeStorage models.StorageNode
- err = rows.Scan(&nodeStorage.ID, &nodeStorage.Address, &nodeStorage.Storage)
+ err = rows.Scan(&nodeStorage.ID, &nodeStorage.Storage)
if err != nil {
return nil, err
}
@@ -91,26 +114,26 @@ func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) {
}
-// GetPrimary gets the primary storage node for a shard of a repository relative path
-func (sd *SQLDatastore) GetPrimary(storage, relativePath string) (*models.StorageNode, error) {
+// GetPrimary gets the primary storage node for a repository of a repository relative path
+func (sd *SQLDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) {
row := sd.db.QueryRow(`
- SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM shards
- INNER JOIN storage_nodes ON shards.primary = storage_nodes.id
- WHERE shards.storage = $1 AND shards.relative_path = $2
- `, storage, relativePath)
+ SELECT storage_nodes.id, storage_nodes.storage FROM repositories
+ INNER JOIN storage_nodes ON repositories.primary = storage_nodes.id
+ WHERE repositories.relative_path = $1
+ `, relativePath)
var s models.StorageNode
- if err := row.Scan(&s.ID, &s.Address, &s.Storage); err != nil {
+ if err := row.Scan(&s.ID, &s.Storage); err != nil {
return nil, err
}
return &s, nil
}
-// SetPrimary sets the primary storagee node for a shard of a repository relative path
-func (sd *SQLDatastore) SetPrimary(storage, relativePath string, storageNodeID int) error {
- res, err := sd.db.Exec(`UPDATE shards SET "primary" = $1 WHERE storage = $2 AND relative_path = $3`, storageNodeID, storage, relativePath)
+// SetPrimary sets the primary storagee node for a repository of a repository relative path
+func (sd *SQLDatastore) SetPrimary(relativePath string, storageNodeID int) error {
+ res, err := sd.db.Exec(`UPDATE repositories SET "primary" = $1 WHERE relative_path = $2`, storageNodeID, relativePath)
if err != nil {
return err
}
@@ -118,7 +141,7 @@ func (sd *SQLDatastore) SetPrimary(storage, relativePath string, storageNodeID i
if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
- res, err = sd.db.Exec(`INSERT INTO shards (storage, relative_path, "primary") VALUES ($1, $2, $3)`, storage, relativePath, storageNodeID)
+ res, err = sd.db.Exec(`INSERT INTO repositories (storage, relative_path, "primary") VALUES ($1, $2)`, relativePath, storageNodeID)
if err != nil {
return err
}
@@ -132,11 +155,11 @@ func (sd *SQLDatastore) SetPrimary(storage, relativePath string, storageNodeID i
return nil
}
-// AddSecondary adds a secondary to a shard of a repository relative path
-func (sd *SQLDatastore) AddSecondary(storage, relativePath string, storageNodeID int) error {
+// AddReplica adds a replica to a repository of a repository relative path
+func (sd *SQLDatastore) AddReplica(relativePath string, storageNodeID int) error {
res, err := sd.db.Exec(`
- INSERT INTO shard_secondaries (shard_id, storage_node_id)
- VALUES (SELECT id, $1 FROM shards WHERE storage = $2 AND relative_path = $3)`, storageNodeID, storage, relativePath)
+ INSERT INTO repository_replicas (repository_id, storage_node_id)
+ VALUES (SELECT id, $1 FROM repositories WHERE relative_path = $2)`, storageNodeID, relativePath)
if err != nil {
return err
}
@@ -144,17 +167,17 @@ func (sd *SQLDatastore) AddSecondary(storage, relativePath string, storageNodeID
if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
- return errors.New("secondary already exists")
+ return errors.New("replica already exists")
}
return nil
}
-// RemoveSecondary removes a secondary from a shard of a repository relative path
-func (sd *SQLDatastore) RemoveSecondary(storage, relativePath string, storageNodeID int) error {
+// RemoveReplica removes a replica from a repository of a repository relative path
+func (sd *SQLDatastore) RemoveReplica(relativePath string, storageNodeID int) error {
res, err := sd.db.Exec(`
- DELETE FROM shard_secondaries (shard_relative_path, node_storage_id)
- WHERE shard_id = (SELECT id FROM shard where storage = $1 AND relative_path = $2) AND storage_node_id = $3`, storage, relativePath, storageNodeID)
+ DELETE FROM repository_replicas (repository_relative_path, node_storage_id)
+ WHERE repository_id = (SELECT id FROM repository WHERE relative_path = $1) AND storage_node_id = $2`, relativePath, storageNodeID)
if err != nil {
return err
}
@@ -162,33 +185,33 @@ func (sd *SQLDatastore) RemoveSecondary(storage, relativePath string, storageNod
if n, err := res.RowsAffected(); err != nil {
return err
} else if n == 0 {
- return errors.New("secondary did not exist")
+ return errors.New("replica did not exist")
}
return nil
}
-// GetShard gets the shard for a repository relative path
-func (sd *SQLDatastore) GetShard(storage, relativePath string) (*models.Shard, error) {
- primary, err := sd.GetPrimary(storage, relativePath)
+// GetRepository gets the repository for a repository relative path
+func (sd *SQLDatastore) GetRepository(relativePath string) (*models.Repository, error) {
+ primary, err := sd.GetPrimary(relativePath)
if err != nil {
return nil, fmt.Errorf("getting primary: %v", err)
}
- secondaries, err := sd.GetSecondaries(storage, relativePath)
+ replicas, err := sd.GetReplicas(relativePath)
if err != nil {
- return nil, fmt.Errorf("getting secondaries: %v", err)
+ return nil, fmt.Errorf("getting replicas: %v", err)
}
- return &models.Shard{RelativePath: relativePath, Primary: *primary, Secondaries: secondaries}, nil
+ return &models.Repository{RelativePath: relativePath, Primary: *primary, Replicas: replicas}, nil
}
-// RotatePrimary rotates a primary out of being primary, and picks a secondary of each shard at random to promote to the new primary
+// RotatePrimary rotates a primary out of being primary, and picks a replica of each repository at random to promote to the new primary
func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error {
- // Add the primary as a secondary
+ // Add the primary as a replica
res, err := sd.db.Exec(`
- INSERT INTO shard_secondaries (shard_id, node_storage_id) VALUES (SELECT shards.id, shards.primary FROM shards WHERE shards.primary = $1)
+ INSERT INTO repository_replicas (repository_id, node_storage_id) VALUES (SELECT repositories.id, repositories.primary FROM repositories WHERE repositories.primary = $1)
`, primaryNodeStorageID)
if err != nil {
return err
@@ -200,14 +223,14 @@ func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error {
}
if affected == 0 {
- return fmt.Errorf("no shards with primary %d found", primaryNodeStorageID)
+ return fmt.Errorf("no repositories with primary %d found", primaryNodeStorageID)
}
- // Choose a new secondary
- res, err = sd.db.Exec(`UPDATE shards SET "primary" =
- (SELECT shard_secondaries.storage_node_id FROM shard_secondaries
- INNER JOIN shards ON shard_secondaries.shard_id = shards.id
- WHERE shards.primary = $1 AND shards.primary != shard_secondaries.storage_node_id LIMIT 1)
+ // Choose a new replica
+ res, err = sd.db.Exec(`UPDATE repositories SET "primary" =
+ (SELECT repository_replicas.storage_node_id FROM repository_replicas
+ INNER JOIN repositories ON repository_replicas.repository_id = repositories.id
+ WHERE repositories.primary = $1 AND repositories.primary != repository_replicas.storage_node_id LIMIT 1)
`, primaryNodeStorageID)
if err != nil {
return err
@@ -219,7 +242,7 @@ func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error {
}
if affected == 0 {
- return errors.New("no secondaries available to rotate")
+ return errors.New("no replicas available to rotate")
}
return nil
}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 497f6ba03..6730ca1e4 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -41,10 +41,11 @@ const (
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
- ID uint64 // autoincrement ID
- TargetNodeID int // which node to replicate to?
- Source models.Repository // source for replication
- State JobState
+ ID uint64 // autoincrement ID
+ TargetNodeID int // which node to replicate to?
+ SourceStorage string
+ Source models.Repository // source for replication
+ State JobState
}
// replJobs provides sort manipulation behavior
@@ -68,21 +69,21 @@ type Datastore interface {
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- GetSecondaries(storage, relativePath string) ([]models.StorageNode, error)
+ GetReplicas(relativePath string) ([]models.StorageNode, error)
GetStorageNode(nodeID int) (models.StorageNode, error)
GetStorageNodes() ([]models.StorageNode, error)
- GetPrimary(storage, relativePath string) (*models.StorageNode, error)
+ GetPrimary(relativePath string) (*models.StorageNode, error)
- SetPrimary(storage, relativePath string, storageNodeID int) error
+ SetPrimary(relativePath string, storageNodeID int) error
- AddSecondary(storage, relativePath string, storageNodeID int) error
+ AddReplica(relativePath string, storageNodeID int) error
- RemoveSecondary(storage, relativePath string, storageNodeID int) error
+ RemoveReplica(relativePath string, storageNodeID int) error
- GetShard(storage, relativePath string) (*models.Shard, error)
+ GetRepository(relativePath string) (*models.Repository, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -93,19 +94,20 @@ type ReplJobsDatastore interface {
// count-length.
GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error)
- // CreateSecondaryJobs will create replication jobs for each secondary
+ // CreateReplicaJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateSecondaryReplJobs(storge, relativePath string) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
}
type jobRecord struct {
- relativePath string // project's relative path
- targetNodeID int
- state JobState
+ sourceStorage string
+ relativePath string // project's relative path
+ targetNodeID int
+ state JobState
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
@@ -123,9 +125,9 @@ type MemoryDatastore struct {
m map[int]models.StorageNode
}
- shards *struct {
+ repositories *struct {
sync.RWMutex
- m map[string]models.Shard
+ m map[string]models.Repository
}
}
@@ -146,28 +148,28 @@ func NewMemoryDatastore() *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
- shards: &struct {
+ repositories: &struct {
sync.RWMutex
- m map[string]models.Shard
+ m map[string]models.Repository
}{
- m: map[string]models.Shard{},
+ m: map[string]models.Repository{},
},
}
}
-// GetSecondaries gets the secondaries for a shard based on the relative path
-func (md *MemoryDatastore) GetSecondaries(storage, relativePath string) ([]models.StorageNode, error) {
- md.shards.RLock()
+// GetReplicas gets the secondaries for a shard based on the relative path
+func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) {
+ md.repositories.RLock()
md.storageNodes.RLock()
defer md.storageNodes.RUnlock()
- defer md.shards.RUnlock()
+ defer md.repositories.RUnlock()
- shard, ok := md.shards.m[storage+relativePath]
+ shard, ok := md.repositories.m[relativePath]
if !ok {
return nil, errors.New("shard not found")
}
- return shard.Secondaries, nil
+ return shard.Replicas, nil
}
// GetStorageNode gets all storage nodes
@@ -197,11 +199,11 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) {
}
// GetPrimary gets the primary storage node for a shard of a repository relative path
-func (md *MemoryDatastore) GetPrimary(storage, relativePath string) (*models.StorageNode, error) {
- md.shards.RLock()
- defer md.shards.RUnlock()
+func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) {
+ md.repositories.RLock()
+ defer md.repositories.RUnlock()
- shard, ok := md.shards.m[storage+relativePath]
+ shard, ok := md.repositories.m[relativePath]
if !ok {
return nil, errors.New("shard not found")
}
@@ -215,11 +217,11 @@ func (md *MemoryDatastore) GetPrimary(storage, relativePath string) (*models.Sto
}
// SetPrimary sets the primary storagee node for a shard of a repository relative path
-func (md *MemoryDatastore) SetPrimary(storage, relativePath string, storageNodeID int) error {
- md.shards.Lock()
- defer md.shards.Unlock()
+func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- shard, ok := md.shards.m[storage+relativePath]
+ shard, ok := md.repositories.m[relativePath]
if !ok {
return errors.New("shard not found")
}
@@ -231,16 +233,16 @@ func (md *MemoryDatastore) SetPrimary(storage, relativePath string, storageNodeI
shard.Primary = storageNode
- md.shards.m[storage+relativePath] = shard
+ md.repositories.m[relativePath] = shard
return nil
}
-// AddSecondary adds a secondary to a shard of a repository relative path
-func (md *MemoryDatastore) AddSecondary(storage, relativePath string, storageNodeID int) error {
- md.shards.Lock()
- defer md.shards.Unlock()
+// AddReplica adds a secondary to a shard of a repository relative path
+func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- shard, ok := md.shards.m[storage+relativePath]
+ shard, ok := md.repositories.m[relativePath]
if !ok {
return errors.New("shard not found")
}
@@ -250,40 +252,40 @@ func (md *MemoryDatastore) AddSecondary(storage, relativePath string, storageNod
return errors.New("node storage not found")
}
- shard.Secondaries = append(shard.Secondaries, storageNode)
+ shard.Replicas = append(shard.Replicas, storageNode)
- md.shards.m[storage+relativePath] = shard
+ md.repositories.m[relativePath] = shard
return nil
}
-// RemoveSecondary removes a secondary from a shard of a repository relative path
-func (md *MemoryDatastore) RemoveSecondary(storage, relativePath string, storageNodeID int) error {
- md.shards.Lock()
- defer md.shards.Unlock()
+// RemoveReplica removes a secondary from a shard of a repository relative path
+func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- shard, ok := md.shards.m[storage+relativePath]
+ shard, ok := md.repositories.m[relativePath]
if !ok {
return errors.New("shard not found")
}
var secondaries []models.StorageNode
- for _, secondary := range shard.Secondaries {
+ for _, secondary := range shard.Replicas {
if secondary.ID != storageNodeID {
secondaries = append(secondaries, secondary)
}
}
- shard.Secondaries = secondaries
- md.shards.m[storage+relativePath] = shard
+ shard.Replicas = secondaries
+ md.repositories.m[relativePath] = shard
return nil
}
-// GetShard gets the shard for a repository relative path
-func (md *MemoryDatastore) GetShard(storage, relativePath string) (*models.Shard, error) {
- md.shards.Lock()
- defer md.shards.Unlock()
+// GetRepository gets the shard for a repository relative path
+func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- shard, ok := md.shards.m[storage+relativePath]
+ shard, ok := md.repositories.m[relativePath]
if !ok {
return nil, errors.New("shard not found")
}
@@ -291,9 +293,9 @@ func (md *MemoryDatastore) GetShard(storage, relativePath string) (*models.Shard
return &shard, nil
}
-// ErrSecondariesMissing indicates the repository does not have any backup
+// ErrReplicasMissing indicates the repository does not have any backup
// replicas
-var ErrSecondariesMissing = errors.New("repository missing secondary replicas")
+var ErrReplicasMissing = errors.New("repository missing secondary replicas")
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) {
@@ -330,8 +332,9 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
Source: models.Repository{
RelativePath: record.relativePath,
},
- State: record.state,
- TargetNodeID: record.targetNodeID,
+ SourceStorage: record.sourceStorage,
+ State: record.state,
+ TargetNodeID: record.targetNodeID,
}, nil
}
@@ -339,9 +342,9 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
// it fails preconditions for being replicatable
var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication")
-// CreateSecondaryReplJobs creates a replication job for each secondary that
+// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateSecondaryReplJobs(storage, relativePath string) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
@@ -349,7 +352,7 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(storage, relativePath string)
return nil, errors.New("invalid source repository")
}
- shard, err := md.GetShard(storage, relativePath)
+ shard, err := md.GetRepository(relativePath)
if err != nil {
return nil, fmt.Errorf(
"unable to find shard for project at relative path %q",
@@ -359,14 +362,15 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(storage, relativePath string)
var jobIDs []uint64
- for _, secondary := range shard.Secondaries {
+ for _, secondary := range shard.Replicas {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- targetNodeID: secondary.ID,
- state: JobStatePending,
- relativePath: relativePath,
+ targetNodeID: secondary.ID,
+ state: JobStatePending,
+ relativePath: relativePath,
+ sourceStorage: shard.Primary.Storage,
}
jobIDs = append(jobIDs, nextID)
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
index d3da4aeff..a337b9995 100644
--- a/internal/praefect/datastore_memory_test.go
+++ b/internal/praefect/datastore_memory_test.go
@@ -8,17 +8,15 @@ import (
)
// TestMemoryDatastoreWhitelist verifies that the in-memory datastore will
-// populate itself with the correct replication jobs and shards when initialized
+// populate itself with the correct replication jobs and repositories when initialized
// with a configuration file specifying the shard and whitelisted repositories.
func TestMemoryDatastoreWhitelist(t *testing.T) {
repo1 := models.Repository{
RelativePath: "abcd1234",
- Storage: "default",
}
repo2 := models.Repository{
RelativePath: "5678efgh",
- Storage: "default",
}
mds := NewMemoryDatastore()
@@ -37,34 +35,32 @@ func TestMemoryDatastoreWhitelist(t *testing.T) {
Address: "tcp://backup-2",
Storage: "praefect-internal-3",
}
- mds.shards.m[repo1.Storage+repo1.RelativePath] = models.Shard{
- Storage: repo1.Storage,
+ mds.repositories.m[repo1.RelativePath] = models.Repository{
RelativePath: repo1.RelativePath,
Primary: mds.storageNodes.m[1],
- Secondaries: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]},
+ Replicas: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]},
}
- mds.shards.m[repo2.Storage+repo2.RelativePath] = models.Shard{
- Storage: repo2.Storage,
+ mds.repositories.m[repo2.RelativePath] = models.Repository{
RelativePath: repo2.RelativePath,
Primary: mds.storageNodes.m[1],
- Secondaries: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]},
+ Replicas: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]},
}
for _, repo := range []models.Repository{repo1, repo2} {
- jobIDs, err := mds.CreateSecondaryReplJobs(repo.Storage, repo.RelativePath)
+ jobIDs, err := mds.CreateReplicaReplJobs(repo.RelativePath)
require.NoError(t, err)
require.Len(t, jobIDs, 2)
}
- expectSecondaries := []models.StorageNode{
+ expectReplicas := []models.StorageNode{
mds.storageNodes.m[2],
mds.storageNodes.m[3],
}
for _, repo := range []models.Repository{repo1, repo2} {
- actualSecondaries, err := mds.GetSecondaries(repo.Storage, repo.RelativePath)
+ actualReplicas, err := mds.GetReplicas(repo.RelativePath)
require.NoError(t, err)
- require.ElementsMatch(t, expectSecondaries, actualSecondaries)
+ require.ElementsMatch(t, expectReplicas, actualReplicas)
}
backup1 := mds.storageNodes.m[2]
@@ -72,30 +68,34 @@ func TestMemoryDatastoreWhitelist(t *testing.T) {
backup1ExpectedJobs := []ReplJob{
ReplJob{
- ID: 1,
- TargetNodeID: backup1.ID,
- Source: models.Repository{RelativePath: repo1.RelativePath},
- State: JobStatePending,
+ ID: 1,
+ TargetNodeID: backup1.ID,
+ Source: models.Repository{RelativePath: repo1.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStatePending,
},
ReplJob{
- ID: 3,
- TargetNodeID: backup1.ID,
- Source: models.Repository{RelativePath: repo2.RelativePath},
- State: JobStatePending,
+ ID: 3,
+ TargetNodeID: backup1.ID,
+ Source: models.Repository{RelativePath: repo2.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStatePending,
},
}
backup2ExpectedJobs := []ReplJob{
ReplJob{
- ID: 2,
- TargetNodeID: backup2.ID,
- Source: models.Repository{RelativePath: repo1.RelativePath},
- State: JobStatePending,
+ ID: 2,
+ TargetNodeID: backup2.ID,
+ Source: models.Repository{RelativePath: repo1.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStatePending,
},
ReplJob{
- ID: 4,
- TargetNodeID: backup2.ID,
- Source: models.Repository{RelativePath: repo2.RelativePath},
- State: JobStatePending,
+ ID: 4,
+ TargetNodeID: backup2.ID,
+ Source: models.Repository{RelativePath: repo2.RelativePath},
+ SourceStorage: "praefect-internal-1",
+ State: JobStatePending,
},
}
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 743ce192d..320a40103 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -22,8 +22,7 @@ var (
)
var (
- repo1Shard = models.Shard{
- Storage: "storage1",
+ repo1Repository = models.Repository{
RelativePath: proj1,
}
)
@@ -43,7 +42,7 @@ var operations = []struct {
{
desc: "creating replication jobs before secondaries are added results in no jobs added",
opFn: func(t *testing.T, ds Datastore) {
- jobIDs, err := ds.CreateSecondaryReplJobs(repo1Shard.Storage, repo1Shard.RelativePath)
+ jobIDs, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
require.NoError(t, err)
require.Empty(t, jobIDs)
},
@@ -51,21 +50,21 @@ var operations = []struct {
{
desc: "set the primary for the shard",
opFn: func(t *testing.T, ds Datastore) {
- err := ds.SetPrimary(repo1Shard.Storage, repo1Shard.RelativePath, stor1.ID)
+ err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID)
require.NoError(t, err)
},
},
{
desc: "add a secondary replica for the shard",
opFn: func(t *testing.T, ds Datastore) {
- err := ds.AddSecondary(repo1Shard.Storage, repo1Shard.RelativePath, stor2.ID)
+ err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID)
require.NoError(t, err)
},
},
{
desc: "insert first replication job after secondary mapped to primary",
opFn: func(t *testing.T, ds Datastore) {
- ids, err := ds.CreateSecondaryReplJobs(repo1Shard.Storage, repo1Shard.RelativePath)
+ ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath)
require.NoError(t, err)
require.Equal(t, []uint64{1}, ids)
},
@@ -80,10 +79,11 @@ var operations = []struct {
expectedJob := ReplJob{
ID: 1,
Source: models.Repository{
- RelativePath: repo1Shard.RelativePath,
+ RelativePath: repo1Repository.RelativePath,
},
- TargetNodeID: stor2.ID,
- State: JobStatePending,
+ SourceStorage: "praefect-storage-1",
+ TargetNodeID: stor2.ID,
+ State: JobStatePending,
}
require.Equal(t, expectedJob, jobs[0])
},
@@ -110,7 +110,7 @@ var flavors = map[string]func() Datastore{
"in-memory-datastore": func() Datastore {
ds := NewMemoryDatastore()
- ds.shards.m[repo1Shard.Storage+repo1Shard.RelativePath] = repo1Shard
+ ds.repositories.m[repo1Repository.RelativePath] = repo1Repository
ds.storageNodes.m[stor1.ID] = stor1
ds.storageNodes.m[stor2.ID] = stor2
diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go
index 0e1079441..db3224c4c 100644
--- a/internal/praefect/models/node.go
+++ b/internal/praefect/models/node.go
@@ -8,11 +8,10 @@ type StorageNode struct {
Token string `toml:"token"`
}
-// Shard describes a repository's relative path and its primary and list of secondaries
-type Shard struct {
+// Repository describes a repository's relative path and its primary and list of secondaries
+type Repository struct {
ID int
- Storage string
RelativePath string
Primary StorageNode
- Secondaries []StorageNode
+ Replicas []StorageNode
}
diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go
deleted file mode 100644
index e11cdbf0a..000000000
--- a/internal/praefect/models/repository.go
+++ /dev/null
@@ -1,8 +0,0 @@
-package models
-
-// Repository provides all necessary information to address a repository hosted
-// in a specific Gitaly replica
-type Repository struct {
- RelativePath string `toml:"relative_path"` // relative path of repository
- Storage string `toml:"storage"` // storage location, e.g. default
-}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index 66b28b7bf..c854d8ac2 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -15,21 +15,21 @@ import (
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error
+ Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error
}
type defaultReplicator struct {
log *logrus.Logger
}
-func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error {
repository := &gitalypb.Repository{
StorageName: targetStorage,
RelativePath: source.RelativePath,
}
remoteRepository := &gitalypb.Repository{
- StorageName: source.Storage,
+ StorageName: sourceStorage,
RelativePath: source.RelativePath,
}
@@ -121,7 +121,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
return nil
}
- id, err := r.replJobsDS.CreateSecondaryReplJobs(repo.Storage, repo.RelativePath)
+ id, err := r.replJobsDS.CreateReplicaReplJobs(repo.RelativePath)
if err != nil {
return err
}
@@ -187,12 +187,12 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
return err
}
- cc, err := r.coordinator.GetConnection(node.Address)
+ cc, err := r.coordinator.GetConnection(node.Storage)
if err != nil {
return err
}
- if err := r.replicator.Replicate(ctx, job.Source, node.Storage, cc); err != nil {
+ if err := r.replicator.Replicate(ctx, job.Source, job.SourceStorage, node.Storage, cc); err != nil {
r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
return err
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 684b2f61a..0e8a75a47 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -44,21 +44,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
Storage: "praefect-internal-3",
}
- datastore.shards.m["default"+"abcd1234"] = models.Shard{
- Storage: "default",
+ datastore.repositories.m["abcd1234"] = models.Repository{
RelativePath: "abcd1234",
Primary: datastore.storageNodes.m[1],
- Secondaries: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]},
+ Replicas: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]},
}
- datastore.shards.m["default"+"edfg5678"] = models.Shard{
- Storage: "default",
+ datastore.repositories.m["edfg5678"] = models.Repository{
RelativePath: "edfg5678",
Primary: datastore.storageNodes.m[1],
- Secondaries: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]},
+ Replicas: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]},
}
for _, repo := range []string{"abcd1234", "edfg5678"} {
- jobIDs, err := datastore.CreateSecondaryReplJobs("default", repo)
+ jobIDs, err := datastore.CreateReplicaReplJobs(repo)
require.NoError(t, err)
require.Len(t, jobIDs, 2)
}
@@ -75,7 +73,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
)
for _, node := range datastore.storageNodes.m {
- err := coordinator.RegisterNode(node.Address)
+ err := coordinator.RegisterNode(node.Storage, node.Address)
require.NoError(t, err)
}
@@ -91,9 +89,9 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
var expectedResults []result
// we expect one job per whitelisted repo with each backend server
- for _, shard := range datastore.shards.m {
- for _, secondary := range shard.Secondaries {
- cc, err := coordinator.GetConnection(secondary.Address)
+ for _, shard := range datastore.repositories.m {
+ for _, secondary := range shard.Replicas {
+ cc, err := coordinator.GetConnection(secondary.Storage)
require.NoError(t, err)
expectedResults = append(expectedResults,
result{source: models.Repository{RelativePath: shard.RelativePath},
@@ -105,8 +103,8 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
go func() {
// we expect one job per whitelisted repo with each backend server
- for _, shard := range datastore.shards.m {
- for range shard.Secondaries {
+ for _, shard := range datastore.repositories.m {
+ for range shard.Replicas {
result := <-resultsCh
assert.Contains(t, expectedResults, result)
}
@@ -139,7 +137,7 @@ type mockReplicator struct {
resultsCh chan<- result
}
-func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error {
+func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error {
select {
case mr.resultsCh <- result{source, targetStorage, target}:
@@ -203,7 +201,8 @@ func TestReplicate(t *testing.T) {
var replicator defaultReplicator
require.NoError(t, replicator.Replicate(
ctx,
- models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()},
+ models.Repository{RelativePath: testRepo.GetRelativePath()},
+ "default",
backupStorageName,
conn,
))
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 282c61051..656196e65 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -72,7 +72,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
backend, cleanup := newMockDownstream(t, tt.callback)
defer cleanup() // clean up mock downstream server resources
- coordinator.RegisterNode(backend)
+ coordinator.RegisterNode(nodeStorage.Storage, backend)
nodeStorage.Address = backend
datastore.storageNodes.m[id] = nodeStorage
}