diff options
author | John Cai <jcai@gitlab.com> | 2019-06-26 23:30:02 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-06-27 19:27:54 +0300 |
commit | e922041b553ad3c359d69f4aa7683413c8d209ff (patch) | |
tree | a77bf844014f8137f31da5e0a02c334296a03346 | |
parent | 1c4627d348fcd67ccfc4cb4c0f5ac2ff726b4af8 (diff) |
Use datastore to store the primary node
-rw-r--r-- | changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main.go | 3 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 31 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 48 | ||||
-rw-r--r-- | internal/praefect/datastore_test.go | 9 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 8 |
7 files changed, 87 insertions, 21 deletions
diff --git a/changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml b/changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml new file mode 100644 index 000000000..aebb92ef5 --- /dev/null +++ b/changelogs/unreleased/jc-update-datastore-to-contain-primaries.yml @@ -0,0 +1,5 @@ +--- +title: Use datastore to store the primary node +merge_request: 1335 +author: +type: added diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 00769307e..b4efeeae9 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -78,10 +78,11 @@ func configure() (config.Config, error) { } func run(listeners []net.Listener, conf config.Config) error { + var ( // top level server dependencies - coordinator = praefect.NewCoordinator(logger, conf.PrimaryServer.Name) datastore = praefect.NewMemoryDatastore(conf) + coordinator = praefect.NewCoordinator(logger, datastore) repl = praefect.NewReplMgr("default", logger, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) srv = praefect.NewServer(coordinator, repl, nil, logger) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 0b2d71252..b2e6704d5 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -25,22 +25,22 @@ type Coordinator struct { log *logrus.Logger lock sync.RWMutex - storageLoc string + datastore PrimaryDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, storageLoc string, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) return &Coordinator{ - log: l, - storageLoc: storageLoc, - nodes: make(map[string]*grpc.ClientConn), - registry: registry, + log: l, + datastore: datastore, + nodes: make(map[string]*grpc.ClientConn), + registry: registry, } } @@ -68,7 +68,8 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) - if c.storageLoc == "" { + storageName, err := c.datastore.GetPrimary() + if err != nil { err := status.Error( codes.FailedPrecondition, "no downstream node registered", @@ -78,9 +79,9 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, ok := c.getConn(c.storageLoc) + cc, ok := c.getConn(storageName) if !ok { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", c.storageLoc) + return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName) } return ctx, cc, nil @@ -88,7 +89,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // RegisterNode will direct traffic to the supplied downstream connection when the storage location // is encountered. -func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error { +func (c *Coordinator) RegisterNode(storageName, listenAddr string) error { conn, err := client.Dial(listenAddr, []grpc.DialOption{ grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec())), @@ -99,20 +100,20 @@ func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error { return err } - c.setConn(storageLoc, conn) + c.setConn(storageName, conn) return nil } -func (c *Coordinator) setConn(storageLoc string, conn *grpc.ClientConn) { +func (c *Coordinator) setConn(storageName string, conn *grpc.ClientConn) { c.lock.Lock() - c.nodes[storageLoc] = conn + c.nodes[storageName] = conn c.lock.Unlock() } -func (c *Coordinator) getConn(storageLoc string) (*grpc.ClientConn, bool) { +func (c *Coordinator) getConn(storageName string) (*grpc.ClientConn, bool) { c.lock.RLock() - cc, ok := c.nodes[storageLoc] + cc, ok := c.nodes[storageName] c.lock.RUnlock() return cc, ok diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 3dbf45976..eeb9f9728 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -14,6 +14,11 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/config" ) +var ( + // ErrPrimaryNotSet indicates the primary has not been set in the datastore + ErrPrimaryNotSet = errors.New("primary is not set") +) + // JobState is an enum that indicates the state of a job type JobState uint8 @@ -58,6 +63,15 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I type Datastore interface { ReplJobsDatastore ReplicasDatastore + PrimaryDatastore +} + +// PrimaryDatastore manages accessing and setting the primary storage location +type PrimaryDatastore interface { + // GetPrimary gets the primary storage location + GetPrimary() (string, error) + // SetPrimary sets the primary storage location + SetPrimary(primary string) error } // ReplicasDatastore manages accessing and setting which secondary replicas @@ -115,6 +129,11 @@ type MemoryDatastore struct { next uint64 records map[uint64]jobRecord // all jobs indexed by ID } + + primary *struct { + sync.RWMutex + storageName string + } } // NewMemoryDatastore returns an initialized in-memory datastore @@ -134,6 +153,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, + primary: &struct { + sync.RWMutex + storageName string + }{ + storageName: cfg.PrimaryServer.Name, + }, } secondaries := make([]string, len(cfg.SecondaryServers)) @@ -306,3 +331,26 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } + +// SetPrimary sets the primary datastore location +func (md *MemoryDatastore) SetPrimary(primary string) error { + md.primary.Lock() + defer md.primary.Unlock() + + md.primary.storageName = primary + + return nil +} + +// GetPrimary gets the primary datastore location +func (md *MemoryDatastore) GetPrimary() (string, error) { + md.primary.RLock() + defer md.primary.RUnlock() + + storageName := md.primary.storageName + if storageName == "" { + return "", ErrPrimaryNotSet + } + + return storageName, nil +} diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 5f48d2bb6..6534b9d88 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -90,7 +90,14 @@ var operations = []struct { // TODO: add SQL datastore flavor var flavors = map[string]func() praefect.Datastore{ - "in-memory-datastore": func() praefect.Datastore { return praefect.NewMemoryDatastore(config.Config{}) }, + "in-memory-datastore": func() praefect.Datastore { + return praefect.NewMemoryDatastore( + config.Config{ + PrimaryServer: &config.GitalyServer{ + Name: "default", + }, + }) + }, } // TestDatastoreInterface will verify that every implementation or "flavor" of diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 503cd0d47..ac2dcf6f6 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -48,7 +48,7 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { }, } datastore = praefect.NewMemoryDatastore(cfg) - coordinator = praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name) + coordinator = praefect.NewCoordinator(logrus.New(), datastore) resultsCh = make(chan result) replman = praefect.NewReplMgr( cfg.SecondaryServers[1].Name, @@ -178,7 +178,7 @@ func TestReplicate(t *testing.T) { defer srv.Stop() datastore := praefect.NewMemoryDatastore(cfg) - coordinator := praefect.NewCoordinator(logrus.New(), cfg.PrimaryServer.Name) + coordinator := praefect.NewCoordinator(logrus.New(), datastore) coordinator.RegisterNode("backup", socketPath) coordinator.RegisterNode("default", socketPath) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index e82ba87c6..b6f2a8fd9 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -50,8 +50,12 @@ func TestServerSimpleUnaryUnary(t *testing.T) { storageBackup = "backup" ) - coordinator := praefect.NewCoordinator(logrus.New(), storagePrimary) - datastore := praefect.NewMemoryDatastore(config.Config{}) + datastore := praefect.NewMemoryDatastore(config.Config{ + PrimaryServer: &config.GitalyServer{ + Name: "default", + }, + }) + coordinator := praefect.NewCoordinator(logrus.New(), datastore) replmgr := praefect.NewReplMgr( storagePrimary, logrus.New(), |