diff options
author | John Cai <jcai@gitlab.com> | 2019-07-29 23:37:12 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-07-31 03:16:43 +0300 |
commit | 96b3d426161a04297fa92045467d51291aaed751 (patch) | |
tree | 4004251692d16733bfa4a32225e44141192e941f | |
parent | 4cb8fc3e6175f4dce685f9183b2a6403162ee5c1 (diff) |
Refactor per mr commentsjc-sql-data-store
-rw-r--r-- | NOTICE | 53 | ||||
-rw-r--r-- | _support/praefect-cluster/config.praefect.toml | 16 | ||||
-rw-r--r-- | cmd/praefect/main.go | 28 | ||||
-rw-r--r-- | go.mod | 13 | ||||
-rw-r--r-- | go.sum | 26 | ||||
-rw-r--r-- | internal/helper/storage.go | 7 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 15 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 21 | ||||
-rw-r--r-- | internal/praefect/config/testdata/config.toml | 5 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 20 | ||||
-rw-r--r-- | internal/praefect/database/migrations/1_initial_up.sql | 13 | ||||
-rw-r--r-- | internal/praefect/database/sql_datastore.go | 123 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 136 | ||||
-rw-r--r-- | internal/praefect/datastore_memory_test.go | 58 | ||||
-rw-r--r-- | internal/praefect/datastore_test.go | 20 | ||||
-rw-r--r-- | internal/praefect/models/node.go | 7 | ||||
-rw-r--r-- | internal/praefect/models/repository.go | 8 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 12 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 29 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 2 |
20 files changed, 352 insertions, 260 deletions
@@ -2887,6 +2887,59 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/crypto +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +PATENTS - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/crypto +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ LICENSE - gitlab.com/gitlab-org/gitaly/vendor/golang.org/x/net Copyright (c) 2009 The Go Authors. All rights reserved. diff --git a/_support/praefect-cluster/config.praefect.toml b/_support/praefect-cluster/config.praefect.toml index 750faac60..2a4297248 100644 --- a/_support/praefect-cluster/config.praefect.toml +++ b/_support/praefect-cluster/config.praefect.toml @@ -24,14 +24,22 @@ whitelist = ["@hashed/2c/62/2c624232cdd221771294dfbb310aca000a0df6ac8b66b696d90e # as shard. listen_addr should be unique for all nodes. # Requires the protocol to be defined, e.g. tcp://host.tld:1234 -[[node]] +[[storage_node]] # listen_addr = "tcp://gitaly-primary:9999" + storage = "praefect-internal-1" address = "tcp://127.0.0.1:9999" -[[node]] +[[storage_node]] # listen_addr = "tcp://gitaly-backup-1:9999" + storage = "praefect-internal-2" address = "tcp://127.0.0.1:9998" -[[node]] +[[storage_node]] # listen_addr = "tcp://gitaly-backup-2:9999" - address = "tcp://127.0.0.1:9997"
\ No newline at end of file + storage = "praefect-internal-3" + address = "tcp://127.0.0.1:9997" + +[postgres] + user = "johncai" + address = "127.0.0.1:5432" + database = "praefect_test"
\ No newline at end of file diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index aaf5e1f20..d4ac47f18 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -95,10 +95,11 @@ func configure() (config.Config, error) { func run(listeners []net.Listener, conf config.Config) error { sqlDatastore, err := database.NewSQLDatastore( - os.Getenv("PRAEFECT_PG_USER"), - os.Getenv("PRAEFECT_PG_PASSWORD"), - os.Getenv("PRAEFECT_PG_ADDRESS"), - os.Getenv("PRAEFECT_PG_DATABASE")) + conf.Postgres.User, + conf.Postgres.Password, + conf.Postgres.Address, + conf.Postgres.Database, + ) if err != nil { return fmt.Errorf("failed to create sql datastore: %v", err) @@ -116,6 +117,10 @@ func run(listeners []net.Listener, conf config.Config) error { serverErrors = make(chan error, 1) ) + if err := sqlDatastore.LoadFromConfig(conf); err != nil { + return fmt.Errorf("loading config for database: %v", err) + } + signal.Notify(termCh, signals...) for _, l := range listeners { @@ -125,22 +130,11 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodes, err := sqlDatastore.GetStorageNodes() - if err != nil { - return fmt.Errorf("failed to get storage nodes from database: %v", err) - } - - addresses := make(map[string]struct{}) - for _, node := range nodes { - if _, ok := addresses[node.Address]; ok { - continue - } - if err := coordinator.RegisterNode(node.Address); err != nil { + for _, node := range conf.StorageNodes { + if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil { return fmt.Errorf("failed to register %s: %s", node.Address, err) } - addresses[node.Address] = struct{}{} - logger.WithField("node_address", node.Address).Info("registered gitaly node") } @@ -4,7 +4,7 @@ require ( github.com/BurntSushi/toml v0.3.1 github.com/cloudflare/tableflip v0.0.0-20190329062924-8392f1641731 github.com/getsentry/raven-go v0.1.2 - github.com/golang/protobuf v1.3.2 + github.com/golang/protobuf v1.3.1 github.com/google/uuid v1.1.1 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 @@ -12,17 +12,18 @@ require ( github.com/kr/pretty v0.1.0 // indirect github.com/lib/pq v1.2.0 github.com/libgit2/git2go v0.0.0-20190104134018-ecaeb7a21d47 - github.com/onsi/ginkgo v1.8.0 // indirect - github.com/onsi/gomega v1.5.0 // indirect github.com/prometheus/client_golang v1.0.0 - github.com/sirupsen/logrus v1.4.1 + github.com/sirupsen/logrus v1.2.0 github.com/stretchr/testify v1.3.0 github.com/tinylib/msgp v1.1.0 // indirect - gitlab.com/gitlab-org/gitaly-proto v1.38.0 + gitlab.com/gitlab-org/gitaly-proto v1.37.0 gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c + golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 // indirect golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 - golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 + golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 golang.org/x/sys v0.0.0-20190422165155-953cdadca894 + google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect google.golang.org/grpc v1.16.0 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect ) @@ -35,8 +35,6 @@ github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfb github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= -github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= @@ -72,12 +70,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= -github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= -github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= @@ -101,15 +95,12 @@ github.com/prometheus/procfs v0.0.2 h1:6LJUbpNm42llc4HRCuvApCSWB/WfhuNo9K98Q9sNG github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= @@ -119,8 +110,8 @@ github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1 github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v1.5.0 h1:OHbgr8l656Ub3Fw5k9SWnBfIEwvoHQ+W2y+Aa9D1Uyo= github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= -gitlab.com/gitlab-org/gitaly-proto v1.38.0 h1:46jlky1yhAC+WfYA9F/0N328QTYMBxOc1/AAwD7rQs8= -gitlab.com/gitlab-org/gitaly-proto v1.38.0/go.mod h1:zNjk/86bjwLVJ4NcvInBcXcLdptdRFQ28sYrdFbrFgY= +gitlab.com/gitlab-org/gitaly-proto v1.37.0 h1:cRQXF3kW+AR1eLIYTWfb+Eqa+Wd0PIcTq4FkaSCWvK4= +gitlab.com/gitlab-org/gitaly-proto v1.37.0/go.mod h1:zNjk/86bjwLVJ4NcvInBcXcLdptdRFQ28sYrdFbrFgY= gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c h1:xo48LcGsTCasKcJpQDBCCuZU+aP8uGaboUVvD7Lgm6g= gitlab.com/gitlab-org/labkit v0.0.0-20190221122536-0c3fc7cdd57c/go.mod h1:rYhLgfrbEcyfinG+R3EvKu6bZSsmwQqcXzLfHWSfUKM= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= @@ -128,10 +119,14 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 h1:8dUaAV7K4uHsF56JQWkprecIQKdPHtR9jCHF5nB8uzc= +golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181106065722-10aee1819953/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= @@ -141,14 +136,13 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6 h1:bjcUS9ztw9kFmmIxJInhon/0Is3p+EHBKNgquIzo1OI= -golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= @@ -156,8 +150,9 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/appengine v1.1.0 h1:igQkv0AAhEIvTEpD5LIpAfav2eeVO9HBTjvKHVJPRSs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 h1:yvw+zsSmSM02Z5H3ZdEV7B7Ql7eFrjQTnmByJvK+3J8= +google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898/go.mod h1:7Ep/1NZk928CDR8SjdVbjWNpdIf6nzjE3BTgJDr2Atg= google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= gopkg.in/DataDog/dd-trace-go.v1 v1.7.0 h1:7wbMayb6JXcbAS95RN7MI42W3o1BCxCcdIzZfVWBAiE= @@ -171,6 +166,7 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/helper/storage.go b/internal/helper/storage.go index a76b50f52..f3f0d0ba0 100644 --- a/internal/helper/storage.go +++ b/internal/helper/storage.go @@ -41,12 +41,7 @@ func IncomingToOutgoing(ctx context.Context) context.Context { return ctx } - gitalyServersJSONEncoded := md["gitaly-servers"] - if len(gitalyServersJSONEncoded) == 0 { - return ctx - } - - return metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", gitalyServersJSONEncoded[0])) + return metadata.NewOutgoingContext(ctx, md) } diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index eb0fad56b..eb044d308 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -23,6 +23,16 @@ type Config struct { Logging config.Logging `toml:"logging"` PrometheusListenAddr string `toml:"prometheus_listen_addr"` + + Postgres *Postgres `toml:"postgres"` +} + +// Postgres contains details for connecting to a postgres database +type Postgres struct { + User string `toml:"user"` + Password string `toml:"password"` + Address string `toml:"address"` + Database string `toml:"database"` } // FromFile loads the config for the passed file path @@ -43,6 +53,7 @@ var ( errNoGitalyServers = errors.New("no primary gitaly backends configured") errDuplicateGitalyAddr = errors.New("gitaly listen addresses are not unique") errGitalyWithoutAddr = errors.New("all gitaly nodes must have an address") + errNoPostgres = errors.New("postgres configuration missing") ) // Validate establishes if the config is valid @@ -68,5 +79,9 @@ func (c Config) Validate() error { listenAddrs[node.Address] = true } + if c.Postgres == nil { + return errNoPostgres + } + return nil } diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index b89bdd648..c6de879fc 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -22,27 +22,32 @@ func TestConfigValidation(t *testing.T) { }{ { desc: "No ListenAddr or SocketPath", - config: Config{ListenAddr: "", StorageNodes: nodes}, + config: Config{ListenAddr: "", StorageNodes: nodes, Postgres: &Postgres{}}, err: errNoListener, }, { desc: "Only a SocketPath", - config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes}, + config: Config{SocketPath: "/tmp/praefect.socket", StorageNodes: nodes, Postgres: &Postgres{}}, err: nil, }, { desc: "No servers", - config: Config{ListenAddr: "localhost:1234"}, + config: Config{ListenAddr: "localhost:1234", Postgres: &Postgres{}}, err: errNoGitalyServers, }, { desc: "duplicate address", - config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address})}, + config: Config{ListenAddr: "localhost:1234", StorageNodes: append(nodes, &models.StorageNode{Address: nodes[0].Address}), Postgres: &Postgres{}}, err: errDuplicateGitalyAddr, }, { - desc: "Valid config", + desc: "Missing Postgres", config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes}, + err: errNoPostgres, + }, + { + desc: "Valid config", + config: Config{ListenAddr: "localhost:1234", StorageNodes: nodes, Postgres: &Postgres{}}, err: nil, }, } @@ -78,6 +83,12 @@ func TestConfigParsing(t *testing.T) { }, }, Whitelist: []string{"abcd1234", "edfg5678"}, + Postgres: &Postgres{ + User: "pg_user", + Password: "password", + Address: "/tmp/postgres.socket", + Database: "praefect_test", + }, }, }, } diff --git a/internal/praefect/config/testdata/config.toml b/internal/praefect/config/testdata/config.toml index 247db51a9..4464c5663 100644 --- a/internal/praefect/config/testdata/config.toml +++ b/internal/praefect/config/testdata/config.toml @@ -21,3 +21,8 @@ prometheus_listen_addr = "" address = "tcp://gitaly-internal-3.example.com" storage = "praefect-internal-3" +[postgres] + user = "pg_user" + password = "password" + address = "/tmp/postgres.socket" + database = "praefect_test" diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 86a254b5a..053e178cb 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -97,7 +97,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, return nil, nil, err } - primary, err = c.datastore.GetPrimary(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) + primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) if err != nil { if err != sql.ErrNoRows { @@ -117,7 +117,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, newPrimary := nodes[0] // set the primary - if err = c.datastore.SetPrimary(targetRepo.GetStorageName(), targetRepo.GetRelativePath(), newPrimary.ID); err != nil { + if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { return nil, nil, err } @@ -138,9 +138,9 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, err := c.GetConnection(primary.Address) + cc, err := c.GetConnection(primary.Storage) if err != nil { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Address) + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", primary.Storage) } return helper.IncomingToOutgoing(ctx), cc, nil @@ -148,7 +148,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(address string) error { +func (c *Coordinator) RegisterNode(storage, address string) error { conn, err := client.Dial(address, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), @@ -159,21 +159,21 @@ func (c *Coordinator) RegisterNode(address string) error { return err } - c.setConn(address, conn) + c.setConn(storage, conn) return nil } -func (c *Coordinator) setConn(address string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(storage string, conn *grpc.ClientConn) { c.connMutex.Lock() - c.nodes[address] = conn + c.nodes[storage] = conn c.connMutex.Unlock() } // GetConnection gets the grpc client connection based on an address -func (c *Coordinator) GetConnection(address string) (*grpc.ClientConn, error) { +func (c *Coordinator) GetConnection(storage string) (*grpc.ClientConn, error) { c.connMutex.RLock() - cc, ok := c.nodes[address] + cc, ok := c.nodes[storage] c.connMutex.RUnlock() if !ok { return nil, errors.New("client connection not found") diff --git a/internal/praefect/database/migrations/1_initial_up.sql b/internal/praefect/database/migrations/1_initial_up.sql index 774d4ff42..9a9437a62 100644 --- a/internal/praefect/database/migrations/1_initial_up.sql +++ b/internal/praefect/database/migrations/1_initial_up.sql @@ -1,19 +1,16 @@ CREATE TABLE IF NOT EXISTS storage_nodes ( id SERIAL PRIMARY KEY, - storage TEXT NOT NULL, - address TEXT NOT NULL + storage TEXT NOT NULL ); -CREATE TABLE IF NOT EXISTS shards ( +CREATE TABLE IF NOT EXISTS repositories ( id SERIAL PRIMARY KEY, - storage TEXT NOT NULL, relative_path TEXT NOT NULL, "primary" INTEGER REFERENCES storage_nodes (id), - UNIQUE (storage, relative_path) ); -CREATE TABLE IF NOT EXISTS shard_secondaries ( - shard_id INTEGER REFERENCES shards(id), +CREATE TABLE IF NOT EXISTS repository_replicas ( + repository_id INTEGER REFERENCES repositories(id), storage_node_id INTEGER REFERENCES storage_nodes(id), - PRIMARY KEY(shard_id, storage_node_id) + PRIMARY KEY(repository_id, storage_node_id) );
\ No newline at end of file diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/database/sql_datastore.go index bd64a2ef9..a6cc902a6 100644 --- a/internal/praefect/database/sql_datastore.go +++ b/internal/praefect/database/sql_datastore.go @@ -3,12 +3,14 @@ package database import ( "errors" "fmt" + "strings" "database/sql" // the lib/pg package provides postgres bindings for the sql package _ "github.com/lib/pq" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) @@ -28,14 +30,35 @@ func NewSQLDatastore(user, password, address, database string) (*SQLDatastore, e return &SQLDatastore{db: db}, nil } -// GetSecondaries gets the secondaries for a shard based on the relative path -func (sd *SQLDatastore) GetSecondaries(storage, relativePath string) ([]models.StorageNode, error) { - var secondaries []models.StorageNode +// LoadFromConfig loads the config into the database +func (sd *SQLDatastore) LoadFromConfig(cfg config.Config) error { + _, err := sd.db.Exec(insertStorageNodesQuery(cfg.StorageNodes)) + if err != nil { + return fmt.Errorf("loading StorageNodes: %v", err) + } + + return nil +} + +func insertStorageNodesQuery(storageNodes []*models.StorageNode) string { + q := `INSERT INTO storage_nodes (storage) VALUES %s ON CONFLICT (storage) DO NOTHING` + + var values []string + for _, storageNode := range storageNodes { + values = append(values, fmt.Sprintf(`('%s')`, storageNode.Storage)) + } + + return fmt.Sprintf(q, strings.Join(values, ",")) +} + +// GetReplicas gets the replicas for a repository based on the relative path +func (sd *SQLDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) { + var replicas []models.StorageNode rows, err := sd.db.Query(` - SELECT storage_nodes.id, storage_nodes.address FROM shards - INNER JOIN shard_secondaries ON shards.id = shard_secondaries.shard_id - INNER JOIN storage_nodes ON storage_nodes.id = shard_secondaries.storage_node_id WHERE shards.storage = $1 AND shards.relative_path = $2`, storage, relativePath) + SELECT storage_nodes.id FROM repositories + INNER JOIN repository_replicas ON repositories.id = repository_replicas.repository_id + INNER JOIN storage_nodes ON storage_nodes.id = repository_replicas.storage_node_id WHERE repositories.relative_path = $1`, relativePath) if err != nil { return nil, err @@ -43,23 +66,23 @@ func (sd *SQLDatastore) GetSecondaries(storage, relativePath string) ([]models.S for rows.Next() { var s models.StorageNode - err = rows.Scan(&s.ID, &s.Address) + err = rows.Scan(&s.ID) if err != nil { return nil, err } - secondaries = append(secondaries, s) + replicas = append(replicas, s) } - return secondaries, nil + return replicas, nil } // GetStorageNode gets all storage storage_nodes func (sd *SQLDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) { var node models.StorageNode - row := sd.db.QueryRow("SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM storage_nodes WHERE storage_nodes.id = $1", nodeID) + row := sd.db.QueryRow("SELECT storage_nodes.id, storage_nodes.storage FROM storage_nodes WHERE storage_nodes.id = $1", nodeID) - err := row.Scan(&node.ID, &node.Address, &node.Storage) + err := row.Scan(&node.ID, &node.Storage) if err != nil { return node, err } @@ -72,7 +95,7 @@ func (sd *SQLDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) { func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) { var nodeStorages []models.StorageNode - rows, err := sd.db.Query("SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM storage_nodes") + rows, err := sd.db.Query("SELECT storage_nodes.id, storage_nodes.storage FROM storage_nodes") if err != nil { return nil, err @@ -80,7 +103,7 @@ func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) { for rows.Next() { var nodeStorage models.StorageNode - err = rows.Scan(&nodeStorage.ID, &nodeStorage.Address, &nodeStorage.Storage) + err = rows.Scan(&nodeStorage.ID, &nodeStorage.Storage) if err != nil { return nil, err } @@ -91,26 +114,26 @@ func (sd *SQLDatastore) GetStorageNodes() ([]models.StorageNode, error) { } -// GetPrimary gets the primary storage node for a shard of a repository relative path -func (sd *SQLDatastore) GetPrimary(storage, relativePath string) (*models.StorageNode, error) { +// GetPrimary gets the primary storage node for a repository of a repository relative path +func (sd *SQLDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { row := sd.db.QueryRow(` - SELECT storage_nodes.id, storage_nodes.address, storage_nodes.storage FROM shards - INNER JOIN storage_nodes ON shards.primary = storage_nodes.id - WHERE shards.storage = $1 AND shards.relative_path = $2 - `, storage, relativePath) + SELECT storage_nodes.id, storage_nodes.storage FROM repositories + INNER JOIN storage_nodes ON repositories.primary = storage_nodes.id + WHERE repositories.relative_path = $1 + `, relativePath) var s models.StorageNode - if err := row.Scan(&s.ID, &s.Address, &s.Storage); err != nil { + if err := row.Scan(&s.ID, &s.Storage); err != nil { return nil, err } return &s, nil } -// SetPrimary sets the primary storagee node for a shard of a repository relative path -func (sd *SQLDatastore) SetPrimary(storage, relativePath string, storageNodeID int) error { - res, err := sd.db.Exec(`UPDATE shards SET "primary" = $1 WHERE storage = $2 AND relative_path = $3`, storageNodeID, storage, relativePath) +// SetPrimary sets the primary storagee node for a repository of a repository relative path +func (sd *SQLDatastore) SetPrimary(relativePath string, storageNodeID int) error { + res, err := sd.db.Exec(`UPDATE repositories SET "primary" = $1 WHERE relative_path = $2`, storageNodeID, relativePath) if err != nil { return err } @@ -118,7 +141,7 @@ func (sd *SQLDatastore) SetPrimary(storage, relativePath string, storageNodeID i if n, err := res.RowsAffected(); err != nil { return err } else if n == 0 { - res, err = sd.db.Exec(`INSERT INTO shards (storage, relative_path, "primary") VALUES ($1, $2, $3)`, storage, relativePath, storageNodeID) + res, err = sd.db.Exec(`INSERT INTO repositories (storage, relative_path, "primary") VALUES ($1, $2)`, relativePath, storageNodeID) if err != nil { return err } @@ -132,11 +155,11 @@ func (sd *SQLDatastore) SetPrimary(storage, relativePath string, storageNodeID i return nil } -// AddSecondary adds a secondary to a shard of a repository relative path -func (sd *SQLDatastore) AddSecondary(storage, relativePath string, storageNodeID int) error { +// AddReplica adds a replica to a repository of a repository relative path +func (sd *SQLDatastore) AddReplica(relativePath string, storageNodeID int) error { res, err := sd.db.Exec(` - INSERT INTO shard_secondaries (shard_id, storage_node_id) - VALUES (SELECT id, $1 FROM shards WHERE storage = $2 AND relative_path = $3)`, storageNodeID, storage, relativePath) + INSERT INTO repository_replicas (repository_id, storage_node_id) + VALUES (SELECT id, $1 FROM repositories WHERE relative_path = $2)`, storageNodeID, relativePath) if err != nil { return err } @@ -144,17 +167,17 @@ func (sd *SQLDatastore) AddSecondary(storage, relativePath string, storageNodeID if n, err := res.RowsAffected(); err != nil { return err } else if n == 0 { - return errors.New("secondary already exists") + return errors.New("replica already exists") } return nil } -// RemoveSecondary removes a secondary from a shard of a repository relative path -func (sd *SQLDatastore) RemoveSecondary(storage, relativePath string, storageNodeID int) error { +// RemoveReplica removes a replica from a repository of a repository relative path +func (sd *SQLDatastore) RemoveReplica(relativePath string, storageNodeID int) error { res, err := sd.db.Exec(` - DELETE FROM shard_secondaries (shard_relative_path, node_storage_id) - WHERE shard_id = (SELECT id FROM shard where storage = $1 AND relative_path = $2) AND storage_node_id = $3`, storage, relativePath, storageNodeID) + DELETE FROM repository_replicas (repository_relative_path, node_storage_id) + WHERE repository_id = (SELECT id FROM repository WHERE relative_path = $1) AND storage_node_id = $2`, relativePath, storageNodeID) if err != nil { return err } @@ -162,33 +185,33 @@ func (sd *SQLDatastore) RemoveSecondary(storage, relativePath string, storageNod if n, err := res.RowsAffected(); err != nil { return err } else if n == 0 { - return errors.New("secondary did not exist") + return errors.New("replica did not exist") } return nil } -// GetShard gets the shard for a repository relative path -func (sd *SQLDatastore) GetShard(storage, relativePath string) (*models.Shard, error) { - primary, err := sd.GetPrimary(storage, relativePath) +// GetRepository gets the repository for a repository relative path +func (sd *SQLDatastore) GetRepository(relativePath string) (*models.Repository, error) { + primary, err := sd.GetPrimary(relativePath) if err != nil { return nil, fmt.Errorf("getting primary: %v", err) } - secondaries, err := sd.GetSecondaries(storage, relativePath) + replicas, err := sd.GetReplicas(relativePath) if err != nil { - return nil, fmt.Errorf("getting secondaries: %v", err) + return nil, fmt.Errorf("getting replicas: %v", err) } - return &models.Shard{RelativePath: relativePath, Primary: *primary, Secondaries: secondaries}, nil + return &models.Repository{RelativePath: relativePath, Primary: *primary, Replicas: replicas}, nil } -// RotatePrimary rotates a primary out of being primary, and picks a secondary of each shard at random to promote to the new primary +// RotatePrimary rotates a primary out of being primary, and picks a replica of each repository at random to promote to the new primary func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error { - // Add the primary as a secondary + // Add the primary as a replica res, err := sd.db.Exec(` - INSERT INTO shard_secondaries (shard_id, node_storage_id) VALUES (SELECT shards.id, shards.primary FROM shards WHERE shards.primary = $1) + INSERT INTO repository_replicas (repository_id, node_storage_id) VALUES (SELECT repositories.id, repositories.primary FROM repositories WHERE repositories.primary = $1) `, primaryNodeStorageID) if err != nil { return err @@ -200,14 +223,14 @@ func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error { } if affected == 0 { - return fmt.Errorf("no shards with primary %d found", primaryNodeStorageID) + return fmt.Errorf("no repositories with primary %d found", primaryNodeStorageID) } - // Choose a new secondary - res, err = sd.db.Exec(`UPDATE shards SET "primary" = - (SELECT shard_secondaries.storage_node_id FROM shard_secondaries - INNER JOIN shards ON shard_secondaries.shard_id = shards.id - WHERE shards.primary = $1 AND shards.primary != shard_secondaries.storage_node_id LIMIT 1) + // Choose a new replica + res, err = sd.db.Exec(`UPDATE repositories SET "primary" = + (SELECT repository_replicas.storage_node_id FROM repository_replicas + INNER JOIN repositories ON repository_replicas.repository_id = repositories.id + WHERE repositories.primary = $1 AND repositories.primary != repository_replicas.storage_node_id LIMIT 1) `, primaryNodeStorageID) if err != nil { return err @@ -219,7 +242,7 @@ func (sd *SQLDatastore) RotatePrimary(primaryNodeStorageID int) error { } if affected == 0 { - return errors.New("no secondaries available to rotate") + return errors.New("no replicas available to rotate") } return nil } diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 497f6ba03..6730ca1e4 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -41,10 +41,11 @@ const ( // meant for updating the repository so that it is synced with the primary // copy. Scheduled indicates when a replication job should be performed. type ReplJob struct { - ID uint64 // autoincrement ID - TargetNodeID int // which node to replicate to? - Source models.Repository // source for replication - State JobState + ID uint64 // autoincrement ID + TargetNodeID int // which node to replicate to? + SourceStorage string + Source models.Repository // source for replication + State JobState } // replJobs provides sort manipulation behavior @@ -68,21 +69,21 @@ type Datastore interface { // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { - GetSecondaries(storage, relativePath string) ([]models.StorageNode, error) + GetReplicas(relativePath string) ([]models.StorageNode, error) GetStorageNode(nodeID int) (models.StorageNode, error) GetStorageNodes() ([]models.StorageNode, error) - GetPrimary(storage, relativePath string) (*models.StorageNode, error) + GetPrimary(relativePath string) (*models.StorageNode, error) - SetPrimary(storage, relativePath string, storageNodeID int) error + SetPrimary(relativePath string, storageNodeID int) error - AddSecondary(storage, relativePath string, storageNodeID int) error + AddReplica(relativePath string, storageNodeID int) error - RemoveSecondary(storage, relativePath string, storageNodeID int) error + RemoveReplica(relativePath string, storageNodeID int) error - GetShard(storage, relativePath string) (*models.Shard, error) + GetRepository(relativePath string) (*models.Repository, error) } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -93,19 +94,20 @@ type ReplJobsDatastore interface { // count-length. GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error) - // CreateSecondaryJobs will create replication jobs for each secondary + // CreateReplicaJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateSecondaryReplJobs(storge, relativePath string) ([]uint64, error) + CreateReplicaReplJobs(relativePath string) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error } type jobRecord struct { - relativePath string // project's relative path - targetNodeID int - state JobState + sourceStorage string + relativePath string // project's relative path + targetNodeID int + state JobState } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is @@ -123,9 +125,9 @@ type MemoryDatastore struct { m map[int]models.StorageNode } - shards *struct { + repositories *struct { sync.RWMutex - m map[string]models.Shard + m map[string]models.Repository } } @@ -146,28 +148,28 @@ func NewMemoryDatastore() *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, - shards: &struct { + repositories: &struct { sync.RWMutex - m map[string]models.Shard + m map[string]models.Repository }{ - m: map[string]models.Shard{}, + m: map[string]models.Repository{}, }, } } -// GetSecondaries gets the secondaries for a shard based on the relative path -func (md *MemoryDatastore) GetSecondaries(storage, relativePath string) ([]models.StorageNode, error) { - md.shards.RLock() +// GetReplicas gets the secondaries for a shard based on the relative path +func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) { + md.repositories.RLock() md.storageNodes.RLock() defer md.storageNodes.RUnlock() - defer md.shards.RUnlock() + defer md.repositories.RUnlock() - shard, ok := md.shards.m[storage+relativePath] + shard, ok := md.repositories.m[relativePath] if !ok { return nil, errors.New("shard not found") } - return shard.Secondaries, nil + return shard.Replicas, nil } // GetStorageNode gets all storage nodes @@ -197,11 +199,11 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) { } // GetPrimary gets the primary storage node for a shard of a repository relative path -func (md *MemoryDatastore) GetPrimary(storage, relativePath string) (*models.StorageNode, error) { - md.shards.RLock() - defer md.shards.RUnlock() +func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + md.repositories.RLock() + defer md.repositories.RUnlock() - shard, ok := md.shards.m[storage+relativePath] + shard, ok := md.repositories.m[relativePath] if !ok { return nil, errors.New("shard not found") } @@ -215,11 +217,11 @@ func (md *MemoryDatastore) GetPrimary(storage, relativePath string) (*models.Sto } // SetPrimary sets the primary storagee node for a shard of a repository relative path -func (md *MemoryDatastore) SetPrimary(storage, relativePath string, storageNodeID int) error { - md.shards.Lock() - defer md.shards.Unlock() +func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - shard, ok := md.shards.m[storage+relativePath] + shard, ok := md.repositories.m[relativePath] if !ok { return errors.New("shard not found") } @@ -231,16 +233,16 @@ func (md *MemoryDatastore) SetPrimary(storage, relativePath string, storageNodeI shard.Primary = storageNode - md.shards.m[storage+relativePath] = shard + md.repositories.m[relativePath] = shard return nil } -// AddSecondary adds a secondary to a shard of a repository relative path -func (md *MemoryDatastore) AddSecondary(storage, relativePath string, storageNodeID int) error { - md.shards.Lock() - defer md.shards.Unlock() +// AddReplica adds a secondary to a shard of a repository relative path +func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - shard, ok := md.shards.m[storage+relativePath] + shard, ok := md.repositories.m[relativePath] if !ok { return errors.New("shard not found") } @@ -250,40 +252,40 @@ func (md *MemoryDatastore) AddSecondary(storage, relativePath string, storageNod return errors.New("node storage not found") } - shard.Secondaries = append(shard.Secondaries, storageNode) + shard.Replicas = append(shard.Replicas, storageNode) - md.shards.m[storage+relativePath] = shard + md.repositories.m[relativePath] = shard return nil } -// RemoveSecondary removes a secondary from a shard of a repository relative path -func (md *MemoryDatastore) RemoveSecondary(storage, relativePath string, storageNodeID int) error { - md.shards.Lock() - defer md.shards.Unlock() +// RemoveReplica removes a secondary from a shard of a repository relative path +func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - shard, ok := md.shards.m[storage+relativePath] + shard, ok := md.repositories.m[relativePath] if !ok { return errors.New("shard not found") } var secondaries []models.StorageNode - for _, secondary := range shard.Secondaries { + for _, secondary := range shard.Replicas { if secondary.ID != storageNodeID { secondaries = append(secondaries, secondary) } } - shard.Secondaries = secondaries - md.shards.m[storage+relativePath] = shard + shard.Replicas = secondaries + md.repositories.m[relativePath] = shard return nil } -// GetShard gets the shard for a repository relative path -func (md *MemoryDatastore) GetShard(storage, relativePath string) (*models.Shard, error) { - md.shards.Lock() - defer md.shards.Unlock() +// GetRepository gets the shard for a repository relative path +func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) { + md.repositories.Lock() + defer md.repositories.Unlock() - shard, ok := md.shards.m[storage+relativePath] + shard, ok := md.repositories.m[relativePath] if !ok { return nil, errors.New("shard not found") } @@ -291,9 +293,9 @@ func (md *MemoryDatastore) GetShard(storage, relativePath string) (*models.Shard return &shard, nil } -// ErrSecondariesMissing indicates the repository does not have any backup +// ErrReplicasMissing indicates the repository does not have any backup // replicas -var ErrSecondariesMissing = errors.New("repository missing secondary replicas") +var ErrReplicasMissing = errors.New("repository missing secondary replicas") // GetJobs is a more general method to retrieve jobs of a certain state from the datastore func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) { @@ -330,8 +332,9 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re Source: models.Repository{ RelativePath: record.relativePath, }, - State: record.state, - TargetNodeID: record.targetNodeID, + SourceStorage: record.sourceStorage, + State: record.state, + TargetNodeID: record.targetNodeID, }, nil } @@ -339,9 +342,9 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re // it fails preconditions for being replicatable var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication") -// CreateSecondaryReplJobs creates a replication job for each secondary that +// CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateSecondaryReplJobs(storage, relativePath string) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() @@ -349,7 +352,7 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(storage, relativePath string) return nil, errors.New("invalid source repository") } - shard, err := md.GetShard(storage, relativePath) + shard, err := md.GetRepository(relativePath) if err != nil { return nil, fmt.Errorf( "unable to find shard for project at relative path %q", @@ -359,14 +362,15 @@ func (md *MemoryDatastore) CreateSecondaryReplJobs(storage, relativePath string) var jobIDs []uint64 - for _, secondary := range shard.Secondaries { + for _, secondary := range shard.Replicas { nextID := uint64(len(md.jobs.records) + 1) md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - targetNodeID: secondary.ID, - state: JobStatePending, - relativePath: relativePath, + targetNodeID: secondary.ID, + state: JobStatePending, + relativePath: relativePath, + sourceStorage: shard.Primary.Storage, } jobIDs = append(jobIDs, nextID) diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index d3da4aeff..a337b9995 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -8,17 +8,15 @@ import ( ) // TestMemoryDatastoreWhitelist verifies that the in-memory datastore will -// populate itself with the correct replication jobs and shards when initialized +// populate itself with the correct replication jobs and repositories when initialized // with a configuration file specifying the shard and whitelisted repositories. func TestMemoryDatastoreWhitelist(t *testing.T) { repo1 := models.Repository{ RelativePath: "abcd1234", - Storage: "default", } repo2 := models.Repository{ RelativePath: "5678efgh", - Storage: "default", } mds := NewMemoryDatastore() @@ -37,34 +35,32 @@ func TestMemoryDatastoreWhitelist(t *testing.T) { Address: "tcp://backup-2", Storage: "praefect-internal-3", } - mds.shards.m[repo1.Storage+repo1.RelativePath] = models.Shard{ - Storage: repo1.Storage, + mds.repositories.m[repo1.RelativePath] = models.Repository{ RelativePath: repo1.RelativePath, Primary: mds.storageNodes.m[1], - Secondaries: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]}, + Replicas: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]}, } - mds.shards.m[repo2.Storage+repo2.RelativePath] = models.Shard{ - Storage: repo2.Storage, + mds.repositories.m[repo2.RelativePath] = models.Repository{ RelativePath: repo2.RelativePath, Primary: mds.storageNodes.m[1], - Secondaries: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]}, + Replicas: []models.StorageNode{mds.storageNodes.m[2], mds.storageNodes.m[3]}, } for _, repo := range []models.Repository{repo1, repo2} { - jobIDs, err := mds.CreateSecondaryReplJobs(repo.Storage, repo.RelativePath) + jobIDs, err := mds.CreateReplicaReplJobs(repo.RelativePath) require.NoError(t, err) require.Len(t, jobIDs, 2) } - expectSecondaries := []models.StorageNode{ + expectReplicas := []models.StorageNode{ mds.storageNodes.m[2], mds.storageNodes.m[3], } for _, repo := range []models.Repository{repo1, repo2} { - actualSecondaries, err := mds.GetSecondaries(repo.Storage, repo.RelativePath) + actualReplicas, err := mds.GetReplicas(repo.RelativePath) require.NoError(t, err) - require.ElementsMatch(t, expectSecondaries, actualSecondaries) + require.ElementsMatch(t, expectReplicas, actualReplicas) } backup1 := mds.storageNodes.m[2] @@ -72,30 +68,34 @@ func TestMemoryDatastoreWhitelist(t *testing.T) { backup1ExpectedJobs := []ReplJob{ ReplJob{ - ID: 1, - TargetNodeID: backup1.ID, - Source: models.Repository{RelativePath: repo1.RelativePath}, - State: JobStatePending, + ID: 1, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, ReplJob{ - ID: 3, - TargetNodeID: backup1.ID, - Source: models.Repository{RelativePath: repo2.RelativePath}, - State: JobStatePending, + ID: 3, + TargetNodeID: backup1.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, } backup2ExpectedJobs := []ReplJob{ ReplJob{ - ID: 2, - TargetNodeID: backup2.ID, - Source: models.Repository{RelativePath: repo1.RelativePath}, - State: JobStatePending, + ID: 2, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo1.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, ReplJob{ - ID: 4, - TargetNodeID: backup2.ID, - Source: models.Repository{RelativePath: repo2.RelativePath}, - State: JobStatePending, + ID: 4, + TargetNodeID: backup2.ID, + Source: models.Repository{RelativePath: repo2.RelativePath}, + SourceStorage: "praefect-internal-1", + State: JobStatePending, }, } diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 743ce192d..320a40103 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -22,8 +22,7 @@ var ( ) var ( - repo1Shard = models.Shard{ - Storage: "storage1", + repo1Repository = models.Repository{ RelativePath: proj1, } ) @@ -43,7 +42,7 @@ var operations = []struct { { desc: "creating replication jobs before secondaries are added results in no jobs added", opFn: func(t *testing.T, ds Datastore) { - jobIDs, err := ds.CreateSecondaryReplJobs(repo1Shard.Storage, repo1Shard.RelativePath) + jobIDs, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) require.NoError(t, err) require.Empty(t, jobIDs) }, @@ -51,21 +50,21 @@ var operations = []struct { { desc: "set the primary for the shard", opFn: func(t *testing.T, ds Datastore) { - err := ds.SetPrimary(repo1Shard.Storage, repo1Shard.RelativePath, stor1.ID) + err := ds.SetPrimary(repo1Repository.RelativePath, stor1.ID) require.NoError(t, err) }, }, { desc: "add a secondary replica for the shard", opFn: func(t *testing.T, ds Datastore) { - err := ds.AddSecondary(repo1Shard.Storage, repo1Shard.RelativePath, stor2.ID) + err := ds.AddReplica(repo1Repository.RelativePath, stor2.ID) require.NoError(t, err) }, }, { desc: "insert first replication job after secondary mapped to primary", opFn: func(t *testing.T, ds Datastore) { - ids, err := ds.CreateSecondaryReplJobs(repo1Shard.Storage, repo1Shard.RelativePath) + ids, err := ds.CreateReplicaReplJobs(repo1Repository.RelativePath) require.NoError(t, err) require.Equal(t, []uint64{1}, ids) }, @@ -80,10 +79,11 @@ var operations = []struct { expectedJob := ReplJob{ ID: 1, Source: models.Repository{ - RelativePath: repo1Shard.RelativePath, + RelativePath: repo1Repository.RelativePath, }, - TargetNodeID: stor2.ID, - State: JobStatePending, + SourceStorage: "praefect-storage-1", + TargetNodeID: stor2.ID, + State: JobStatePending, } require.Equal(t, expectedJob, jobs[0]) }, @@ -110,7 +110,7 @@ var flavors = map[string]func() Datastore{ "in-memory-datastore": func() Datastore { ds := NewMemoryDatastore() - ds.shards.m[repo1Shard.Storage+repo1Shard.RelativePath] = repo1Shard + ds.repositories.m[repo1Repository.RelativePath] = repo1Repository ds.storageNodes.m[stor1.ID] = stor1 ds.storageNodes.m[stor2.ID] = stor2 diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go index 0e1079441..db3224c4c 100644 --- a/internal/praefect/models/node.go +++ b/internal/praefect/models/node.go @@ -8,11 +8,10 @@ type StorageNode struct { Token string `toml:"token"` } -// Shard describes a repository's relative path and its primary and list of secondaries -type Shard struct { +// Repository describes a repository's relative path and its primary and list of secondaries +type Repository struct { ID int - Storage string RelativePath string Primary StorageNode - Secondaries []StorageNode + Replicas []StorageNode } diff --git a/internal/praefect/models/repository.go b/internal/praefect/models/repository.go deleted file mode 100644 index e11cdbf0a..000000000 --- a/internal/praefect/models/repository.go +++ /dev/null @@ -1,8 +0,0 @@ -package models - -// Repository provides all necessary information to address a repository hosted -// in a specific Gitaly replica -type Repository struct { - RelativePath string `toml:"relative_path"` // relative path of repository - Storage string `toml:"storage"` // storage location, e.g. default -} diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index 66b28b7bf..c854d8ac2 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -15,21 +15,21 @@ import ( // Replicator performs the actual replication logic between two nodes type Replicator interface { - Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error + Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error } type defaultReplicator struct { log *logrus.Logger } -func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error { +func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { repository := &gitalypb.Repository{ StorageName: targetStorage, RelativePath: source.RelativePath, } remoteRepository := &gitalypb.Repository{ - StorageName: source.Storage, + StorageName: sourceStorage, RelativePath: source.RelativePath, } @@ -121,7 +121,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository return nil } - id, err := r.replJobsDS.CreateSecondaryReplJobs(repo.Storage, repo.RelativePath) + id, err := r.replJobsDS.CreateReplicaReplJobs(repo.RelativePath) if err != nil { return err } @@ -187,12 +187,12 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { return err } - cc, err := r.coordinator.GetConnection(node.Address) + cc, err := r.coordinator.GetConnection(node.Storage) if err != nil { return err } - if err := r.replicator.Replicate(ctx, job.Source, node.Storage, cc); err != nil { + if err := r.replicator.Replicate(ctx, job.Source, job.SourceStorage, node.Storage, cc); err != nil { r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") return err } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 684b2f61a..0e8a75a47 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -44,21 +44,19 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { Storage: "praefect-internal-3", } - datastore.shards.m["default"+"abcd1234"] = models.Shard{ - Storage: "default", + datastore.repositories.m["abcd1234"] = models.Repository{ RelativePath: "abcd1234", Primary: datastore.storageNodes.m[1], - Secondaries: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]}, + Replicas: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]}, } - datastore.shards.m["default"+"edfg5678"] = models.Shard{ - Storage: "default", + datastore.repositories.m["edfg5678"] = models.Repository{ RelativePath: "edfg5678", Primary: datastore.storageNodes.m[1], - Secondaries: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]}, + Replicas: []models.StorageNode{datastore.storageNodes.m[2], datastore.storageNodes.m[3]}, } for _, repo := range []string{"abcd1234", "edfg5678"} { - jobIDs, err := datastore.CreateSecondaryReplJobs("default", repo) + jobIDs, err := datastore.CreateReplicaReplJobs(repo) require.NoError(t, err) require.Len(t, jobIDs, 2) } @@ -75,7 +73,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { ) for _, node := range datastore.storageNodes.m { - err := coordinator.RegisterNode(node.Address) + err := coordinator.RegisterNode(node.Storage, node.Address) require.NoError(t, err) } @@ -91,9 +89,9 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { var expectedResults []result // we expect one job per whitelisted repo with each backend server - for _, shard := range datastore.shards.m { - for _, secondary := range shard.Secondaries { - cc, err := coordinator.GetConnection(secondary.Address) + for _, shard := range datastore.repositories.m { + for _, secondary := range shard.Replicas { + cc, err := coordinator.GetConnection(secondary.Storage) require.NoError(t, err) expectedResults = append(expectedResults, result{source: models.Repository{RelativePath: shard.RelativePath}, @@ -105,8 +103,8 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { go func() { // we expect one job per whitelisted repo with each backend server - for _, shard := range datastore.shards.m { - for range shard.Secondaries { + for _, shard := range datastore.repositories.m { + for range shard.Replicas { result := <-resultsCh assert.Contains(t, expectedResults, result) } @@ -139,7 +137,7 @@ type mockReplicator struct { resultsCh chan<- result } -func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target *grpc.ClientConn) error { +func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, sourceStorage, targetStorage string, target *grpc.ClientConn) error { select { case mr.resultsCh <- result{source, targetStorage, target}: @@ -203,7 +201,8 @@ func TestReplicate(t *testing.T) { var replicator defaultReplicator require.NoError(t, replicator.Replicate( ctx, - models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()}, + models.Repository{RelativePath: testRepo.GetRelativePath()}, + "default", backupStorageName, conn, )) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 282c61051..656196e65 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -72,7 +72,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) { backend, cleanup := newMockDownstream(t, tt.callback) defer cleanup() // clean up mock downstream server resources - coordinator.RegisterNode(backend) + coordinator.RegisterNode(nodeStorage.Storage, backend) nodeStorage.Address = backend datastore.storageNodes.m[id] = nodeStorage } |