Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-06-26 23:30:02 +0300
committerJohn Cai <jcai@gitlab.com>2019-06-27 19:27:54 +0300
commite922041b553ad3c359d69f4aa7683413c8d209ff (patch)
treea77bf844014f8137f31da5e0a02c334296a03346
parent1c4627d348fcd67ccfc4cb4c0f5ac2ff726b4af8 (diff)
Use datastore to store the primary node
-rw-r--r--changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml5
-rw-r--r--cmd/praefect/main.go3
-rw-r--r--internal/praefect/coordinator.go31
-rw-r--r--internal/praefect/datastore.go48
-rw-r--r--internal/praefect/datastore_test.go9
-rw-r--r--internal/praefect/replicator_test.go4
-rw-r--r--internal/praefect/server_test.go8
7 files changed, 87 insertions, 21 deletions
diff --git a/changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml b/changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml
new file mode 100644
index 000000000..aebb92ef5
--- /dev/null
+++ b/changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml
@@ -0,0 +1,5 @@
+---
+title: Use datastore to store the primary node
+merge_request: 1335
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 00769307e..b4efeeae9 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -78,10 +78,11 @@ func configure() (config.Config, error) {
}
func run(listeners []net.Listener, conf config.Config) error {
+
var (
// top level server dependencies
- coordinator = praefect.NewCoordinator(logger, conf.PrimaryServer.Name)
datastore = praefect.NewMemoryDatastore(conf)
+ coordinator = praefect.NewCoordinator(logger, datastore)
repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
srv = praefect.NewServer(coordinator, repl, nil, logger)
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index 0b2d71252..b2e6704d5 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -25,22 +25,22 @@ type Coordinator struct {
log *logrus.Logger
lock sync.RWMutex
- storageLoc string
+ datastore PrimaryDatastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, storageLoc string, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
return &Coordinator{
- log: l,
- storageLoc: storageLoc,
- nodes: make(map[string]*grpc.ClientConn),
- registry: registry,
+ log: l,
+ datastore: datastore,
+ nodes: make(map[string]*grpc.ClientConn),
+ registry: registry,
}
}
@@ -68,7 +68,8 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- if c.storageLoc == "" {
+ storageName, err := c.datastore.GetPrimary()
+ if err != nil {
err := status.Error(
codes.FailedPrecondition,
"no downstream node registered",
@@ -78,9 +79,9 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, ok := c.getConn(c.storageLoc)
+ cc, ok := c.getConn(storageName)
if !ok {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", c.storageLoc)
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName)
}
return ctx, cc, nil
@@ -88,7 +89,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error {
+func (c *Coordinator) RegisterNode(storageName, listenAddr string) error {
conn, err := client.Dial(listenAddr,
[]grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())),
@@ -99,20 +100,20 @@ func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error {
return err
}
- c.setConn(storageLoc, conn)
+ c.setConn(storageName, conn)
return nil
}
-func (c *Coordinator) setConn(storageLoc string, conn *grpc.ClientConn) {
+func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) {
c.lock.Lock()
- c.nodes[storageLoc] = conn
+ c.nodes[storageName] = conn
c.lock.Unlock()
}
-func (c *Coordinator) getConn(storageLoc string) (*grpc.ClientConn, bool) {
+func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) {
c.lock.RLock()
- cc, ok := c.nodes[storageLoc]
+ cc, ok := c.nodes[storageName]
c.lock.RUnlock()
return cc, ok
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 3dbf45976..eeb9f9728 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -14,6 +14,11 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
)
+var (
+ // ErrPrimaryNotSet indicates the primary has not been set in the datastore
+ ErrPrimaryNotSet = errors.New("primary is not set")
+)
+
// JobState is an enum that indicates the state of a job
type JobState uint8
@@ -58,6 +63,15 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I
type Datastore interface {
ReplJobsDatastore
ReplicasDatastore
+ PrimaryDatastore
+}
+
+// PrimaryDatastore manages accessing and setting the primary storage location
+type PrimaryDatastore interface {
+ // GetPrimary gets the primary storage location
+ GetPrimary() (string, error)
+ // SetPrimary sets the primary storage location
+ SetPrimary(primary string) error
}
// ReplicasDatastore manages accessing and setting which secondary replicas
@@ -115,6 +129,11 @@ type MemoryDatastore struct {
next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
+
+ primary *struct {
+ sync.RWMutex
+ storageName string
+ }
}
// NewMemoryDatastore returns an initialized in-memory datastore
@@ -134,6 +153,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
+ primary: &struct {
+ sync.RWMutex
+ storageName string
+ }{
+ storageName: cfg.PrimaryServer.Name,
+ },
}
secondaries := make([]string, len(cfg.SecondaryServers))
@@ -306,3 +331,26 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
+
+// SetPrimary sets the primary datastore location
+func (md *MemoryDatastore) SetPrimary(primary string) error {
+ md.primary.Lock()
+ defer md.primary.Unlock()
+
+ md.primary.storageName = primary
+
+ return nil
+}
+
+// GetPrimary gets the primary datastore location
+func (md *MemoryDatastore) GetPrimary() (string, error) {
+ md.primary.RLock()
+ defer md.primary.RUnlock()
+
+ storageName := md.primary.storageName
+ if storageName == "" {
+ return "", ErrPrimaryNotSet
+ }
+
+ return storageName, nil
+}
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 5f48d2bb6..6534b9d88 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -90,7 +90,14 @@ var operations = []struct {
// TODO: add SQL datastore flavor
var flavors = map[string]func() praefect.Datastore{
- "in-memory-datastore": func() praefect.Datastore { return praefect.NewMemoryDatastore(config.Config{}) },
+ "in-memory-datastore": func() praefect.Datastore {
+ return praefect.NewMemoryDatastore(
+ config.Config{
+ PrimaryServer: &config.GitalyServer{
+ Name: "default",
+ },
+ })
+ },
}
// TestDatastoreInterface will verify that every implementation or "flavor" of
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 503cd0d47..ac2dcf6f6 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -48,7 +48,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
},
}
datastore = praefect.NewMemoryDatastore(cfg)
- coordinator = praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name)
+ coordinator = praefect.NewCoordinator(logrus.New(), datastore)
resultsCh = make(chan result)
replman = praefect.NewReplMgr(
cfg.SecondaryServers[1].Name,
@@ -178,7 +178,7 @@ func TestReplicate(t *testing.T) {
defer srv.Stop()
datastore := praefect.NewMemoryDatastore(cfg)
- coordinator := praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name)
+ coordinator := praefect.NewCoordinator(logrus.New(), datastore)
coordinator.RegisterNode("backup", socketPath)
coordinator.RegisterNode("default", socketPath)
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index e82ba87c6..b6f2a8fd9 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -50,8 +50,12 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
storageBackup = "backup"
)
- coordinator := praefect.NewCoordinator(logrus.New(), storagePrimary)
- datastore := praefect.NewMemoryDatastore(config.Config{})
+ datastore := praefect.NewMemoryDatastore(config.Config{
+ PrimaryServer: &config.GitalyServer{
+ Name: "default",
+ },
+ })
+ coordinator := praefect.NewCoordinator(logrus.New(), datastore)
replmgr := praefect.NewReplMgr(
storagePrimary,
logrus.New(),