Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-08-13 02:30:44 +0300
committerJohn Cai <jcai@gitlab.com>2019-08-24 00:28:53 +0300
commit653b7377e02b789dac3749d7ceefc162d001ec93 (patch)
tree8e8a6e7441cd02ab28ec19e0d27ac133eae66421
parentc06f35897d1e4c38535706d7965db399eafcd736 (diff)
Register nodes by id instead of by storagejc-handle-bad-connections
-rw-r--r--changelogs/unreleased/jc-handle-bad-connections.yml5
-rw-r--r--cmd/praefect/main.go11
-rw-r--r--internal/praefect/coordinator.go93
-rw-r--r--internal/praefect/coordinator_test.go2
-rw-r--r--internal/praefect/datastore.go55
-rw-r--r--internal/praefect/grpc-proxy/proxy/director.go3
-rw-r--r--internal/praefect/grpc-proxy/proxy/examples_test.go7
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler.go23
-rw-r--r--internal/praefect/grpc-proxy/proxy/handler_test.go4
-rw-r--r--internal/praefect/grpc-proxy/proxy/helper_test.go4
-rw-r--r--internal/praefect/models/node.go1
-rw-r--r--internal/praefect/replicator.go2
-rw-r--r--internal/praefect/server.go6
-rw-r--r--internal/praefect/server_test.go2
14 files changed, 141 insertions, 77 deletions
diff --git a/changelogs/unreleased/jc-handle-bad-connections.yml b/changelogs/unreleased/jc-handle-bad-connections.yml
new file mode 100644
index 000000000..e591ccb58
--- /dev/null
+++ b/changelogs/unreleased/jc-handle-bad-connections.yml
@@ -0,0 +1,5 @@
+---
+title: Handle bad connections
+merge_request: 1451
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index cf7d1b8ab..2afeaa7c4 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -114,8 +114,13 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- for _, node := range conf.Nodes {
- if err := coordinator.RegisterNode(node.Storage, node.Address); err != nil {
+ nodes, err := datastore.GetStorageNodes()
+ if err != nil {
+ return fmt.Errorf("failed to get storage nodes from datastore: %v", err)
+ }
+
+ for _, node := range nodes {
+ if err := coordinator.RegisterNode(node.ID, node.Address); err != nil {
return fmt.Errorf("failed to register %s: %s", node.Address, err)
}
@@ -124,8 +129,6 @@ func run(listeners []net.Listener, conf config.Config) error {
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
- go coordinator.FailoverRotation()
-
select {
case s := <-termCh:
logger.WithField("signal", s).Warn("received signal, shutting down gracefully")
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 0c80bd398..64fb992e3 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -4,11 +4,8 @@ import (
"context"
"errors"
"fmt"
- "os"
- "os/signal"
"sort"
"sync"
- "syscall"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
@@ -35,7 +32,7 @@ type Coordinator struct {
datastore Datastore
- nodes map[string]*grpc.ClientConn
+ nodes map[int]*grpc.ClientConn
registry *protoregistry.Registry
}
@@ -47,7 +44,7 @@ func NewCoordinator(l *logrus.Entry, datastore Datastore, fileDescriptors ...*de
return &Coordinator{
log: l,
datastore: datastore,
- nodes: make(map[string]*grpc.ClientConn),
+ nodes: make(map[int]*grpc.ClientConn),
registry: registry,
}
}
@@ -57,15 +54,29 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
return c.registry.RegisterFiles(protos...)
}
+func (c *Coordinator) connDownHandler(cc *grpc.ClientConn) error {
+ c.failoverMutex.Lock()
+ defer c.failoverMutex.Unlock()
+
+ for storageNodeID, conn := range c.nodes {
+ if conn == cc {
+ if err := c.datastore.Failover(storageNodeID); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
// streamDirector determines which downstream servers receive requests
func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, peeker proxy.StreamModifier) (context.Context, *grpc.ClientConn, func(), error) {
+ c.failoverMutex.Lock()
+ defer c.failoverMutex.Unlock()
// For phase 1, we need to route messages based on the storage location
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- c.failoverMutex.RLock()
- defer c.failoverMutex.RUnlock()
-
mi, err := c.registry.LookupMethod(fullMethodName)
if err != nil {
return nil, nil, nil, err
@@ -77,24 +88,24 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
}
var requestFinalizer func()
- var storage string
+ var node models.Node
if mi.Scope == protoregistry.ScopeRepository {
- storage, requestFinalizer, err = c.getStorageForRepositoryMessage(mi, m, peeker)
+ node, requestFinalizer, err = c.getNodeForRepositoryMessage(mi, m, peeker)
if err != nil {
return nil, nil, nil, err
}
} else {
- storage, requestFinalizer, err = c.getAnyStorageNode()
+ node, requestFinalizer, err = c.getAnyStorageNode()
if err != nil {
return nil, nil, nil, err
}
}
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, err := c.GetConnection(storage)
+ cc, err := c.GetConnection(node.ID)
if err != nil {
- return nil, nil, nil, fmt.Errorf("unable to find existing client connection for %s", storage)
+ return nil, nil, nil, fmt.Errorf("unable to find existing client connection for node_id %d", node.ID)
}
return helper.IncomingToOutgoing(ctx), cc, requestFinalizer, nil
@@ -102,51 +113,51 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
var noopRequestFinalizer = func() {}
-func (c *Coordinator) getAnyStorageNode() (string, func(), error) {
+func (c *Coordinator) getAnyStorageNode() (models.Node, func(), error) {
//TODO: For now we just pick a random storage node for a non repository scoped RPC, but we will need to figure out exactly how to
// proxy requests that are not repository scoped
node, err := c.datastore.GetStorageNodes()
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
if len(node) == 0 {
- return "", nil, errors.New("no node storages found")
+ return models.Node{}, nil, errors.New("no node storages found")
}
- return node[0].Storage, noopRequestFinalizer, nil
+ return node[0], noopRequestFinalizer, nil
}
-func (c *Coordinator) getStorageForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (string, func(), error) {
+func (c *Coordinator) getNodeForRepositoryMessage(mi protoregistry.MethodInfo, m proto.Message, peeker proxy.StreamModifier) (models.Node, func(), error) {
targetRepo, err := mi.TargetRepo(m)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
primary, err := c.selectPrimary(mi, targetRepo)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
targetRepo.StorageName = primary.Storage
b, err := proxy.Codec().Marshal(m)
if err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
if err = peeker.Modify(b); err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
requestFinalizer := noopRequestFinalizer
if mi.Operation == protoregistry.OpMutator {
if requestFinalizer, err = c.createReplicaJobs(targetRepo); err != nil {
- return "", nil, err
+ return models.Node{}, nil, err
}
}
- return primary.Storage, requestFinalizer, nil
+ return *primary, requestFinalizer, nil
}
func (c *Coordinator) selectPrimary(mi protoregistry.MethodInfo, targetRepo *gitalypb.Repository) (*models.Node, error) {
@@ -202,6 +213,7 @@ func protoMessageFromPeeker(mi protoregistry.MethodInfo, peeker proxy.StreamModi
}
m, err := mi.UnmarshalRequestProto(frame)
+
if err != nil {
return nil, err
}
@@ -225,7 +237,7 @@ func (c *Coordinator) createReplicaJobs(targetRepo *gitalypb.Repository) (func()
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
+func (c *Coordinator) RegisterNode(nodeID int, listenAddr string) error {
conn, err := client.Dial(listenAddr,
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
@@ -236,21 +248,23 @@ func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
return err
}
- c.setConn(storageName, conn)
+ c.setConn(nodeID, conn)
+
+ c.datastore.UpdateStorageNode(nodeID, true)
return nil
}
-func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
+func (c *Coordinator) setConn(nodeID int, conn *grpc.ClientConn) {
c.connMutex.Lock()
- c.nodes[storageName] = conn
+ c.nodes[nodeID] = conn
c.connMutex.Unlock()
}
// GetConnection gets the grpc client connection based on an address
-func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error) {
+func (c *Coordinator) GetConnection(nodeID int) (*grpc.ClientConn, error) {
c.connMutex.RLock()
- cc, ok := c.nodes[storageName]
+ cc, ok := c.nodes[nodeID]
c.connMutex.RUnlock()
if !ok {
return nil, errors.New("client connection not found")
@@ -259,22 +273,3 @@ func (c *Coordinator) GetConnection(storageName string) (*grpc.ClientConn, error
return cc, nil
}
-
-// FailoverRotation waits for the SIGUSR1 signal, then promotes the next secondary to be primary
-func (c *Coordinator) FailoverRotation() {
- c.handleSignalAndRotate()
-}
-
-func (c *Coordinator) handleSignalAndRotate() {
- failoverChan := make(chan os.Signal, 1)
- signal.Notify(failoverChan, syscall.SIGUSR1)
-
- for {
- <-failoverChan
-
- c.failoverMutex.Lock()
- // TODO: update failover logic
- c.log.Info("failover happens")
- c.failoverMutex.Unlock()
- }
-}
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index e029a3f48..f9cdcf2bc 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -57,7 +57,7 @@ func TestStreamDirector(t *testing.T) {
cc, err := grpc.Dial("tcp://gitaly-primary.example.com", grpc.WithInsecure())
require.NoError(t, err)
- coordinator.setConn("praefect-internal-1", cc)
+ coordinator.setConn(0, cc)
_, conn, jobUpdateFunc, err := coordinator.streamDirector(ctx, "/gitaly.RepositoryService/GarbageCollect", &mockPeeker{frame})
require.NoError(t, err)
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index f61a64064..2bb5f60fd 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -75,6 +75,8 @@ type ReplicasDatastore interface {
GetStorageNodes() ([]models.Node, error)
+ GetHealthyStorageNodes() ([]models.Node, error)
+
GetPrimary(relativePath string) (*models.Node, error)
SetPrimary(relativePath string, storageNodeID int) error
@@ -84,6 +86,10 @@ type ReplicasDatastore interface {
RemoveReplica(relativePath string, storageNodeID int) error
GetRepository(relativePath string) (*models.Repository, error)
+
+ UpdateStorageNode(storageNodeID int, healthy bool) error
+
+ Failover(storageNodeID int) error
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -201,6 +207,21 @@ func (md *MemoryDatastore) GetStorageNodes() ([]models.Node, error) {
return storageNodes, nil
}
+// GetHealthyStorageNodes gets all storage nodes
+func (md *MemoryDatastore) GetHealthyStorageNodes() ([]models.Node, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
+
+ var storageNodes []models.Node
+ for _, storageNode := range md.storageNodes.m {
+ if storageNode.Healthy {
+ storageNodes = append(storageNodes, storageNode)
+ }
+ }
+
+ return storageNodes, nil
+}
+
// GetPrimary gets the primary storage node for a repository of a repository relative path
func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.Node, error) {
md.repositories.RLock()
@@ -413,3 +434,37 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
+
+// Failover replaces any repository with storage as its primary with one of its replicas
+func (md *MemoryDatastore) Failover(storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ md.UpdateStorageNode(storageNodeID, true)
+
+ for relativePath, repository := range md.repositories.m {
+ if repository.Primary.ID == storageNodeID {
+ if len(repository.Replicas) > 0 {
+ repository.Primary = repository.Replicas[0]
+ repository.Replicas = repository.Replicas[1:]
+ md.repositories.m[relativePath] = repository
+ }
+ }
+ }
+ return nil
+}
+
+// UpdateStorageNode updates the health of the given storage node by its id
+func (md *MemoryDatastore) UpdateStorageNode(nodeID int, healthy bool) error {
+ md.storageNodes.Lock()
+ defer md.storageNodes.Unlock()
+
+ storageNode, ok := md.storageNodes.m[nodeID]
+ if !ok {
+ return errors.New("node not found")
+ }
+ storageNode.Healthy = healthy
+ md.storageNodes.m[nodeID] = storageNode
+
+ return nil
+}
diff --git a/internal/praefect/grpc-proxy/proxy/director.go b/internal/praefect/grpc-proxy/proxy/director.go
index 82712765f..98e8e7313 100644
--- a/internal/praefect/grpc-proxy/proxy/director.go
+++ b/internal/praefect/grpc-proxy/proxy/director.go
@@ -22,3 +22,6 @@ import (
//
// See the rather rich example.
type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamModifier) (context.Context, *grpc.ClientConn, func(), error)
+
+// ConnectionDownNotifier takes care of updating the datastore so that other requests stop using the downed connection
+type ConnectionDownNotifier func(cc *grpc.ClientConn) error
diff --git a/internal/praefect/grpc-proxy/proxy/examples_test.go b/internal/praefect/grpc-proxy/proxy/examples_test.go
index 6d4a3238e..ccc24b23e 100644
--- a/internal/praefect/grpc-proxy/proxy/examples_test.go
+++ b/internal/praefect/grpc-proxy/proxy/examples_test.go
@@ -18,14 +18,15 @@ import (
)
var (
- director proxy.StreamDirector
+ director proxy.StreamDirector
+ connDownNotifier proxy.ConnectionDownNotifier
)
func ExampleRegisterService() {
// A gRPC server with the proxying codec enabled.
server := grpc.NewServer(grpc.CustomCodec(proxy.Codec()))
// Register a TestService with 4 of its methods explicitly.
- proxy.RegisterService(server, director,
+ proxy.RegisterService(server, director, connDownNotifier,
"mwitkow.testproto.TestService",
"PingEmpty", "Ping", "PingError", "PingList")
}
@@ -33,7 +34,7 @@ func ExampleRegisterService() {
func ExampleTransparentHandler() {
grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownNotifier)))
}
// Provide sa simple example of a director that shields internal services and dials a staging or production backend.
diff --git a/internal/praefect/grpc-proxy/proxy/handler.go b/internal/praefect/grpc-proxy/proxy/handler.go
index 0ed1b3b5e..2fa11660e 100644
--- a/internal/praefect/grpc-proxy/proxy/handler.go
+++ b/internal/praefect/grpc-proxy/proxy/handler.go
@@ -26,8 +26,8 @@ var (
// The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
-func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) {
- streamer := &handler{director}
+func RegisterService(server *grpc.Server, director StreamDirector, connDownNotifier ConnectionDownNotifier, serviceName string, methodNames ...string) {
+ streamer := &handler{director, connDownNotifier}
fakeDesc := &grpc.ServiceDesc{
ServiceName: serviceName,
HandlerType: (*interface{})(nil),
@@ -49,13 +49,14 @@ func RegisterService(server *grpc.Server, director StreamDirector, serviceName s
// backends. It should be used as a `grpc.UnknownServiceHandler`.
//
// This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.
-func TransparentHandler(director StreamDirector) grpc.StreamHandler {
- streamer := &handler{director}
+func TransparentHandler(director StreamDirector, connectionDownNotifier ConnectionDownNotifier) grpc.StreamHandler {
+ streamer := &handler{director, connectionDownNotifier}
return streamer.handler
}
type handler struct {
- director StreamDirector
+ director StreamDirector
+ connDownNotifier ConnectionDownNotifier
}
// handler is where the real magic of proxying happens.
@@ -76,18 +77,18 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
return err
}
- defer func() {
- if requestFinalizer != nil {
- requestFinalizer()
- }
- }()
-
clientCtx, clientCancel := context.WithCancel(outgoingCtx)
// TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For.
clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)
if err != nil {
return err
}
+
+ defer func() {
+ if requestFinalizer != nil {
+ requestFinalizer()
+ }
+ }()
// Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate.
// Channels do not have to be closed, it is just a control flow mechanism, see
// https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ
diff --git a/internal/praefect/grpc-proxy/proxy/handler_test.go b/internal/praefect/grpc-proxy/proxy/handler_test.go
index c57837d2a..e91ab4a1f 100644
--- a/internal/praefect/grpc-proxy/proxy/handler_test.go
+++ b/internal/praefect/grpc-proxy/proxy/handler_test.go
@@ -221,10 +221,10 @@ func (s *ProxyHappySuite) SetupSuite() {
}
s.proxy = grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownNotifier)),
)
// Ping handler is handled as an explicit registration and not as a TransparentHandler.
- proxy.RegisterService(s.proxy, director,
+ proxy.RegisterService(s.proxy, director, connDownNotifier,
"mwitkow.testproto.TestService",
"Ping")
diff --git a/internal/praefect/grpc-proxy/proxy/helper_test.go b/internal/praefect/grpc-proxy/proxy/helper_test.go
index 615d5a3fd..befb49000 100644
--- a/internal/praefect/grpc-proxy/proxy/helper_test.go
+++ b/internal/praefect/grpc-proxy/proxy/helper_test.go
@@ -56,10 +56,10 @@ func newBackendPinger(tb testing.TB, ctx context.Context) (*grpc.ClientConn, *in
func newProxy(tb testing.TB, ctx context.Context, director proxy.StreamDirector, svc, method string) (*grpc.ClientConn, func()) {
proxySrvr := grpc.NewServer(
grpc.CustomCodec(proxy.Codec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownNotifier)),
)
- proxy.RegisterService(proxySrvr, director, svc, method)
+ proxy.RegisterService(proxySrvr, director, connDownNotifier, svc, method)
done := make(chan struct{})
listener := newListener(tb)
diff --git a/internal/praefect/models/node.go b/internal/praefect/models/node.go
index 941a72e8f..1f7c730c3 100644
--- a/internal/praefect/models/node.go
+++ b/internal/praefect/models/node.go
@@ -6,6 +6,7 @@ type Node struct {
Storage string `toml:"storage"`
Address string `toml:"address"`
Token string `toml:"token"`
+ Healthy bool
}
// Repository describes a repository's relative path and its primary and list of secondaries
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index f54a6f457..ff8139211 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -178,7 +178,7 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
return err
}
- cc, err := r.coordinator.GetConnection(job.TargetNode.Storage)
+ cc, err := r.coordinator.GetConnection(job.TargetNode.ID)
if err != nil {
return err
}
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index 3a7c0fabe..64974e9a1 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -28,7 +28,7 @@ type Server struct {
// NewServer returns an initialized praefect gPRC proxy server configured
// with the provided gRPC server options
func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *logrus.Entry) *Server {
- grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector)...)
+ grpcOpts = append(grpcOpts, proxyRequiredOpts(c.streamDirector, c.connDownHandler)...)
grpcOpts = append(grpcOpts, []grpc.ServerOption{
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpccorrelation.StreamServerCorrelationInterceptor(), // Must be above the metadata handler
@@ -58,10 +58,10 @@ func NewServer(c *Coordinator, repl ReplMgr, grpcOpts []grpc.ServerOption, l *lo
}
}
-func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
+func proxyRequiredOpts(director proxy.StreamDirector, connDownHandler proxy.ConnectionDownNotifier) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.CustomCodec(proxy.Codec()),
- grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
+ grpc.UnknownServiceHandler(proxy.TransparentHandler(director, connDownHandler)),
}
}
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index d04f61121..3227c0b22 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -76,7 +76,7 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
backend, cleanup := newMockDownstream(t, tt.callback)
defer cleanup() // clean up mock downstream server resources
- coordinator.RegisterNode(nodeStorage.Storage, backend)
+ coordinator.RegisterNode(nodeStorage.ID, backend)
nodeStorage.Address = backend
datastore.storageNodes.m[id] = nodeStorage
}