Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-07-20 21:19:00 +0300
committerJohn Cai <jcai@gitlab.com>2019-07-21 20:33:42 +0300
commit28eb0eef4293aa4faefd184ea173f8ae77e447bc (patch)
treebb067fa216834d02b9a57c240ec7ac0c0ba54791
parentb120f5a81cb4b03d608ba0c10f41f0cd06d00779 (diff)
Fixing tests and other refactors
Introducing the SQL data model led to some changes to the datastore interface. That led to more refactors in the rest of the code.
-rw-r--r--cmd/praefect-migrate/main.go4
-rw-r--r--cmd/praefect/main.go20
-rw-r--r--go.mod1
-rw-r--r--go.sum5
-rw-r--r--internal/praefect/common.go2
-rw-r--r--internal/praefect/coordinator.go57
-rw-r--r--internal/praefect/coordinator_test.go31
-rw-r--r--internal/praefect/database/sql_datastore.go14
-rw-r--r--internal/praefect/datastore.go276
-rw-r--r--internal/praefect/datastore_memory_test.go103
-rw-r--r--internal/praefect/datastore_test.go90
-rw-r--r--internal/praefect/mock/mock.pb.go30
-rw-r--r--internal/praefect/mock/mock.proto7
-rw-r--r--internal/praefect/mocksvc_test.go4
-rw-r--r--internal/praefect/protoregistry/targetrepo_test.go2
-rw-r--r--internal/praefect/replicator.go92
-rw-r--r--internal/praefect/replicator_test.go93
-rw-r--r--internal/praefect/server_test.go51
-rw-r--r--internal/rubyserver/balancer/balancer_test.go2
19 files changed, 468 insertions, 416 deletions
diff --git a/cmd/praefect-migrate/main.go b/cmd/praefect-migrate/main.go
index b200eb4b2..3bc507655 100644
--- a/cmd/praefect-migrate/main.go
+++ b/cmd/praefect-migrate/main.go
@@ -26,9 +26,9 @@ func main() {
flag.Parse()
db := pg.Connect(&pg.Options{
- User: os.Getenv("PRAEFECT_PG_USER"),
+ User: os.Getenv("PRAEFECT_PG_USER"),
Password: os.Getenv("PRAEFECT_PG_PASSWORD"),
- Addr: os.Getenv("PRAEFECT_PG_ADDRESS"),
+ Addr: os.Getenv("PRAEFECT_PG_ADDRESS"),
Database: os.Getenv("PRAEFECT_PG_DATABASE"),
})
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index a931bbfdd..d78b6d191 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -106,8 +106,8 @@ func run(listeners []net.Listener, conf config.Config) error {
var (
// top level server dependencies
- datastore = praefect.NewMemoryDatastore(conf)
- coordinator = praefect.NewCoordinator(logger, sqlDatastore, protoregistry.GitalyProtoFileDescriptors...)
+ datastore = praefect.NewMemoryDatastore()
+ coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...)
repl = praefect.NewReplMgr("default", logger, sqlDatastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist))
srv = praefect.NewServer(coordinator, repl, nil, logger)
// signal related
@@ -125,14 +125,20 @@ func run(listeners []net.Listener, conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- nodeAddresses, err := sqlDatastore.GetNodes()
+ nodeStorages, err := sqlDatastore.GetNodeStorages()
- for _, address := range nodeAddresses {
- if err := coordinator.RegisterNode(address); err != nil {
- return fmt.Errorf("failed to register %s: %s", address, err)
+ addresses := make(map[string]struct{})
+ for _, nodeStorage := range nodeStorages {
+ if _, ok := addresses[nodeStorage.Address]; ok {
+ continue
}
+ if err := coordinator.RegisterNode(nodeStorage.Address); err != nil {
+ return fmt.Errorf("failed to register %s: %s", nodeStorage.Address, err)
+ }
+
+ addresses[nodeStorage.Address] = struct{}{}
- logger.WithField("node_address", address).Info("registered gitaly node")
+ logger.WithField("node_address", nodeStorage.Address).Info("registered gitaly node")
}
go func() { serverErrors <- repl.ProcessBacklog(ctx) }()
diff --git a/go.mod b/go.mod
index 851ddb056..ad61a000b 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,6 @@ require (
github.com/getsentry/raven-go v0.1.2
github.com/go-pg/migrations/v7 v7.1.0
github.com/go-pg/pg/v9 v9.0.0-beta
- github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
diff --git a/go.sum b/go.sum
index 916e25b8b..b41344203 100644
--- a/go.sum
+++ b/go.sum
@@ -41,9 +41,8 @@ github.com/go-pg/pg/v9 v9.0.0-beta/go.mod h1:iVSTa1IJiCa0cN5cJJD5n0k3zYliVQC35Wq
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
+github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
-github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
-github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
@@ -79,7 +78,6 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
-github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
@@ -217,7 +215,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
-golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
diff --git a/internal/praefect/common.go b/internal/praefect/common.go
index 2df2a4823..ec65e2513 100644
--- a/internal/praefect/common.go
+++ b/internal/praefect/common.go
@@ -4,7 +4,7 @@ import "google.golang.org/grpc"
// Node is a wrapper around the grpc client connection for a backend Gitaly node
type Node struct {
- Storage string
+ Address string
cc *grpc.ClientConn
}
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b8f3b9901..862b40416 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -3,6 +3,7 @@ package praefect
import (
"context"
"database/sql"
+ "errors"
"fmt"
"math/rand"
"os"
@@ -58,14 +59,14 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto)
}
// GetStorageNode returns the registered node for the given storage location
-func (c *Coordinator) GetStorageNode(storage string) (Node, error) {
- cc, ok := c.getConn(storage)
+func (c *Coordinator) GetStorageNode(address string) (Node, error) {
+ cc, ok := c.getConn(address)
if !ok {
- return Node{}, fmt.Errorf("no node registered for storage location %q", storage)
+ return Node{}, fmt.Errorf("no node registered for storage location %q", address)
}
return Node{
- Storage: storage,
+ Address: address,
cc: cc,
}, nil
}
@@ -107,33 +108,43 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
targetRepo, err := targetRepoFromStream(ctx, c.registry, fullMethodName, peeker)
if err != nil {
- return nil, nil, err
- }
-
- primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
- if err != nil {
- if err != sql.ErrNoRows {
+ //TODO: remove this, This is only here to catch the cases where we cannot find the target repo. We need to add the capability
+ // to the protregistry to be able to read the scope, and if it's not a repository scope then we shouldn't try to find the target repo
+ nodeStorages, err := c.datastore.GetNodeStorages()
+ if err != nil {
return nil, nil, err
}
+ if len(nodeStorages) == 0 {
+ return nil, nil, errors.New("no node storages found")
+ }
+ primary = &nodeStorages[0]
+ } else {
- // if there are no primaries for this repository, pick one
- nodeStorages, err := c.datastore.GetNodesForStorage(targetRepo.GetStorageName())
+ primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath())
if err != nil {
- return nil, nil, err
- }
+ if err != sql.ErrNoRows {
+ return nil, nil, err
+ }
- if len(nodeStorages) == 0 {
- return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
+ // if there are no primaries for this repository, pick one
+ nodeStorages, err := c.datastore.GetNodesForStorage(targetRepo.GetStorageName())
+ if err != nil {
+ return nil, nil, err
+ }
- }
- newPrimary := nodeStorages[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodeStorages))]
+ if len(nodeStorages) == 0 {
+ return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName())
- // set the primary
- if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
- return nil, nil, err
- }
+ }
+ newPrimary := nodeStorages[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodeStorages))]
+
+ // set the primary
+ if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil {
+ return nil, nil, err
+ }
- primary = &newPrimary
+ primary = &newPrimary
+ }
}
// We only need the primary node, as there's only one primary storage
diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go
index 50045f8a0..0275c6048 100644
--- a/internal/praefect/coordinator_test.go
+++ b/internal/praefect/coordinator_test.go
@@ -5,9 +5,6 @@ import (
"testing"
"github.com/sirupsen/logrus"
- "github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
var testLogger = logrus.New()
@@ -17,31 +14,5 @@ func init() {
}
func TestSecondaryRotation(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{Name: "primary"},
- SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}},
- Whitelist: []string{"/repoA", "/repoB"},
- }
- d := NewMemoryDatastore(cfg)
- c := NewCoordinator(testLogger, d)
-
- primary, err := d.GetDefaultPrimary()
- require.NoError(t, err)
-
- require.NoError(t, c.rotateSecondaryToPrimary(primary))
-
- primary, err = d.GetDefaultPrimary()
- require.NoError(t, err)
- require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary")
-
- repositories, err := d.GetRepositoriesForPrimary(primary)
- require.NoError(t, err)
-
- for _, repository := range repositories {
- shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository})
- require.NoError(t, err)
-
- require.Len(t, shardSecondaries, 2)
- require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0])
- }
+ t.Skip("secondary rotation will change with the new data model")
}
diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/database/sql_datastore.go
index d9b96eab0..ffac41b4d 100644
--- a/internal/praefect/database/sql_datastore.go
+++ b/internal/praefect/database/sql_datastore.go
@@ -71,25 +71,25 @@ func (sd *SQLDatastore) GetNodesForStorage(storageName string) ([]models.Storage
return nodeStorages, nil
}
-func (sd *SQLDatastore) GetNodes() ([]string, error) {
- var addresses []string
+func (sd *SQLDatastore) GetNodeStorages() ([]models.StorageNode, error) {
+ var nodeStorages []models.StorageNode
- rows, err := sd.db.Query("SELECT DISTINCT node_storages.address FROM node_storages")
+ rows, err := sd.db.Query("SELECT node_storages.id, node_storages.address,node_storages.storage_name FROM node_storages")
if err != nil {
return nil, err
}
for rows.Next() {
- var address string
- err = rows.Scan(&address)
+ var nodeStorage models.StorageNode
+ err = rows.Scan(&nodeStorage.ID, &nodeStorage.Address, &nodeStorage.StorageName)
if err != nil {
return nil, err
}
- addresses = append(addresses, address)
+ nodeStorages = append(nodeStorages, nodeStorage)
}
- return addresses, nil
+ return nodeStorages, nil
}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 798148ea6..2460c93fb 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -11,7 +11,6 @@ import (
"sort"
"sync"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
@@ -69,11 +68,11 @@ type Datastore interface {
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- GetNodesForStorage(storageName string) ([]models.StorageNode, error)
+ GetSecondaries(relativePath string) ([]models.StorageNode, error)
- GetNodes() ([]string, error)
+ GetNodesForStorage(storageName string) ([]models.StorageNode, error)
- GetDefaultPrimary() (*models.StorageNode, error)
+ GetNodeStorages() ([]models.StorageNode, error)
GetPrimary(relativePath string) (*models.StorageNode, error)
@@ -92,12 +91,12 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, node string, count int) ([]ReplJob, error)
+ GetJobs(flag JobState, storage string, count int) ([]ReplJob, error)
// CreateSecondaryJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateSecondaryReplJobs(source models.Repository) ([]uint64, error)
+ CreateSecondaryReplJobs(relativePath string) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
@@ -110,40 +109,40 @@ type shard struct {
}
type jobRecord struct {
- relativePath string // project's relative path
- targetNode string
- state JobState
+ relativePath string // project's relative path
+ targetStorage string
+ state JobState
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
type MemoryDatastore struct {
- replicas *struct {
- sync.RWMutex
- m map[string]shard // keyed by project's relative path
- }
-
jobs *struct {
sync.RWMutex
next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
- primary *struct {
+ nodeStorages *struct {
+ sync.RWMutex
+ m map[int]models.StorageNode
+ }
+
+ shards *struct {
sync.RWMutex
- server models.GitalyServer
+ m map[string]models.Shard
}
}
// NewMemoryDatastore returns an initialized in-memory datastore
-func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
- m := &MemoryDatastore{
- replicas: &struct {
+func NewMemoryDatastore() *MemoryDatastore {
+ return &MemoryDatastore{
+ nodeStorages: &struct {
sync.RWMutex
- m map[string]shard
+ m map[int]models.StorageNode
}{
- m: map[string]shard{},
+ m: map[int]models.StorageNode{},
},
jobs: &struct {
sync.RWMutex
@@ -153,106 +152,141 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
- primary: &struct {
+ shards: &struct {
sync.RWMutex
- server models.GitalyServer
+ m map[string]models.Shard
}{
- server: models.GitalyServer{
- Name: cfg.PrimaryServer.Name,
- ListenAddr: cfg.PrimaryServer.ListenAddr,
- Token: cfg.PrimaryServer.Token,
- },
+ m: map[string]models.Shard{},
},
}
+}
+
+func (md *MemoryDatastore) GetSecondaries(relativePath string) ([]models.StorageNode, error) {
+ md.shards.RLock()
+ md.nodeStorages.RLock()
+ defer md.nodeStorages.RUnlock()
+ defer md.shards.RUnlock()
- secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers))
- for i, server := range cfg.SecondaryServers {
- secondaryServers[i] = *server
+ shard, ok := md.shards.m[relativePath]
+ if !ok {
+ return nil, errors.New("shard not found")
}
- for _, repo := range cfg.Whitelist {
- // store the configuration file specified shard
- m.replicas.m[repo] = shard{
- primary: *cfg.PrimaryServer,
- secondaries: secondaryServers,
- }
+ return shard.Secondaries, nil
+}
- // initialize replication job queue to replicate all whitelisted repos
- // to every secondary server
- for _, secondary := range cfg.SecondaryServers {
- m.jobs.next++
- m.jobs.records[m.jobs.next] = jobRecord{
- state: JobStateReady,
- targetNode: secondary.Name,
- relativePath: repo,
- }
+func (md *MemoryDatastore) GetNodesForStorage(storageName string) ([]models.StorageNode, error) {
+ md.nodeStorages.RLock()
+ defer md.nodeStorages.RUnlock()
+
+ var nodes []models.StorageNode
+ for _, nodeStorage := range md.nodeStorages.m {
+ if nodeStorage.StorageName == storageName {
+ nodes = append(nodes, nodeStorage)
}
}
-
- return m
+ return nodes, nil
}
-// GetShardSecondaries will return the set of secondary storage locations for a
-// given repository if they exist
-func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) {
- shard, _ := md.getShard(primary.RelativePath)
+func (md *MemoryDatastore) GetNodeStorages() ([]models.StorageNode, error) {
+ md.nodeStorages.RLock()
+ defer md.nodeStorages.RUnlock()
- return shard.secondaries, nil
+ var nodeStorages []models.StorageNode
+ for _, nodeStorage := range md.nodeStorages.m {
+ nodeStorages = append(nodeStorages, nodeStorage)
+ }
+
+ return nodeStorages, nil
}
-// SetShardSecondaries will replace the set of replicas for a repository
-func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) {
+ md.shards.RLock()
+ defer md.shards.RUnlock()
+
+ shard, ok := md.shards.m[relativePath]
+ if !ok {
+ return nil, errors.New("shard not found")
+ }
- shard := md.replicas.m[repo.RelativePath]
- shard.secondaries = secondaries
- md.replicas.m[repo.RelativePath] = shard
+ nodeStorage, ok := md.nodeStorages.m[shard.Primary.ID]
+ if !ok {
+ return nil, errors.New("node storage not found")
+ }
+ return &nodeStorage, nil
- return nil
}
+func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error {
+ md.shards.Lock()
+ defer md.shards.Unlock()
-// SetShardPrimary sets the primary for a repository
-func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+ shard, ok := md.shards.m[relativePath]
+ if !ok {
+ return errors.New("shard not found")
+ }
- shard := md.replicas.m[repo.RelativePath]
- shard.primary = primary
- md.replicas.m[repo.RelativePath] = shard
+ nodeStorage, ok := md.nodeStorages.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
+ shard.Primary = nodeStorage
+
+ md.shards.m[relativePath] = shard
return nil
}
-// GetShardPrimary gets the primary for a repository
-func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+func (md *MemoryDatastore) AddSecondary(relativePath string, storageNodeID int) error {
+ md.shards.Lock()
+ defer md.shards.Unlock()
- shard := md.replicas.m[repo.RelativePath]
- return shard.primary, nil
+ shard, ok := md.shards.m[relativePath]
+ if !ok {
+ return errors.New("shard not found")
+ }
+
+ nodeStorage, ok := md.nodeStorages.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
+
+ shard.Secondaries = append(shard.Secondaries, nodeStorage)
+
+ md.shards.m[relativePath] = shard
+ return nil
}
-// GetRepositoriesForPrimary gets all repositories
-func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+func (md *MemoryDatastore) RemoveSecondary(relativePath string, storageNodeID int) error {
+ md.shards.Lock()
+ defer md.shards.Unlock()
- repositories := make([]string, 0, len(md.replicas.m))
+ shard, ok := md.shards.m[relativePath]
+ if !ok {
+ return errors.New("shard not found")
+ }
- for repository := range md.replicas.m {
- repositories = append(repositories, repository)
+ var secondaries []models.StorageNode
+ for _, secondary := range shard.Secondaries {
+ if secondary.ID != storageNodeID {
+ secondaries = append(secondaries, secondary)
+ }
}
- return repositories, nil
+ shard.Secondaries = secondaries
+ md.shards.m[relativePath] = shard
+ return nil
}
-func (md *MemoryDatastore) getShard(project string) (shard, bool) {
- md.replicas.RLock()
- replicas, ok := md.replicas.m[project]
- md.replicas.RUnlock()
+func (md *MemoryDatastore) GetShard(relativePath string) (*models.Shard, error) {
+ md.shards.Lock()
+ defer md.shards.Unlock()
- return replicas, ok
+ shard, ok := md.shards.m[relativePath]
+ if !ok {
+ return nil, errors.New("shard not found")
+ }
+
+ return &shard, nil
}
// ErrSecondariesMissing indicates the repository does not have any backup
@@ -260,7 +294,7 @@ func (md *MemoryDatastore) getShard(project string) (shard, bool) {
var ErrSecondariesMissing = errors.New("repository missing secondary replicas")
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) {
+func (md *MemoryDatastore) GetJobs(state JobState, targetStorage string, count int) ([]ReplJob, error) {
md.jobs.RLock()
defer md.jobs.RUnlock()
@@ -268,7 +302,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.targetNode == storage {
+ if record.state&state != 0 && record.targetStorage == targetStorage {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -289,8 +323,8 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
// replJobFromRecord constructs a replication job from a record and by cross
// referencing the current shard for the project being replicated
func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- shard, ok := md.getShard(record.relativePath)
- if !ok {
+ shard, err := md.GetShard(record.relativePath)
+ if err != nil {
return ReplJob{}, fmt.Errorf(
"unable to find shard for project at relative path %q",
record.relativePath,
@@ -301,46 +335,45 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re
ID: jobID,
Source: models.Repository{
RelativePath: record.relativePath,
- Storage: shard.primary.Name,
+ Storage: shard.Primary.StorageName,
},
State: record.state,
- Target: record.targetNode,
+ Target: record.targetStorage,
}, nil
}
-// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because
+// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because
// it fails preconditions for being replicatable
-var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication")
+var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication")
// CreateSecondaryReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) {
+func (md *MemoryDatastore) CreateSecondaryReplJobs(relativePath string) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
- emptyRepo := models.Repository{}
- if source == emptyRepo {
+ if relativePath == "" {
return nil, errors.New("invalid source repository")
}
- shard, ok := md.getShard(source.RelativePath)
- if !ok {
+ shard, err := md.GetShard(relativePath)
+ if err != nil {
return nil, fmt.Errorf(
"unable to find shard for project at relative path %q",
- source.RelativePath,
+ relativePath,
)
}
var jobIDs []uint64
- for _, secondary := range shard.secondaries {
+ for _, secondary := range shard.Secondaries {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- targetNode: secondary.Name,
- state: JobStatePending,
- relativePath: source.RelativePath,
+ targetStorage: secondary.StorageName,
+ state: JobStatePending,
+ relativePath: relativePath,
}
jobIDs = append(jobIDs, nextID)
@@ -369,36 +402,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
-
-// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error {
- md.primary.Lock()
- defer md.primary.Unlock()
-
- md.primary.server = primary
-
- return nil
-}
-
-// GetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- primary := md.primary.server
- if primary == (models.GitalyServer{}) {
- return primary, ErrPrimaryNotSet
- }
-
- return primary, nil
-}
-
-// SetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- md.primary.server = primary
-
- return nil
-}
diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go
index 6099a8328..09a5e2e46 100644
--- a/internal/praefect/datastore_memory_test.go
+++ b/internal/praefect/datastore_memory_test.go
@@ -1,11 +1,9 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
@@ -13,81 +11,98 @@ import (
// populate itself with the correct replication jobs and shards when initialized
// with a configuration file specifying the shard and whitelisted repositories.
func TestMemoryDatastoreWhitelist(t *testing.T) {
- cfg := config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup-1",
- },
- {
- Name: "backup-2",
- },
- },
- Whitelist: []string{"abcd1234", "5678efgh"},
- }
+ mds := NewMemoryDatastore()
- mds := praefect.NewMemoryDatastore(cfg)
+ mds.nodeStorages.m[1] = models.StorageNode{
+ ID: 1,
+ StorageName: "default",
+ Address: "tcp://default",
+ }
+ mds.nodeStorages.m[2] = models.StorageNode{
+ ID: 2,
+ StorageName: "backup-1",
+ Address: "tcp://backup-1",
+ }
+ mds.nodeStorages.m[3] = models.StorageNode{
+ ID: 3,
+ StorageName: "backup-2",
+ Address: "tcp://backup-2",
+ }
+ mds.shards.m["abcd1234"] = models.Shard{
+ RelativePath: "abcd1234",
+ Primary: mds.nodeStorages.m[1],
+ Secondaries: []models.StorageNode{mds.nodeStorages.m[2], mds.nodeStorages.m[3]},
+ }
+ mds.shards.m["5678efgh"] = models.Shard{
+ RelativePath: "5678efgh",
+ Primary: mds.nodeStorages.m[1],
+ Secondaries: []models.StorageNode{mds.nodeStorages.m[2], mds.nodeStorages.m[3]},
+ }
repo1 := models.Repository{
- RelativePath: cfg.Whitelist[0],
- Storage: cfg.PrimaryServer.Name,
+ RelativePath: "abcd1234",
+ Storage: "default",
}
repo2 := models.Repository{
- RelativePath: cfg.Whitelist[1],
- Storage: cfg.PrimaryServer.Name,
+ RelativePath: "5678efgh",
+ Storage: "default",
+ }
+
+ for _, repo := range []models.Repository{repo1, repo2} {
+ jobIDs, err := mds.CreateSecondaryReplJobs(repo.RelativePath)
+ require.NoError(t, err)
+ require.Len(t, jobIDs, 2)
}
- expectSecondaries := []models.GitalyServer{
- models.GitalyServer{Name: cfg.SecondaryServers[0].Name},
- models.GitalyServer{Name: cfg.SecondaryServers[1].Name},
+ expectSecondaries := []models.StorageNode{
+ models.StorageNode{ID: 2, StorageName: "backup-1", Address: "tcp://backup-1"},
+ models.StorageNode{ID: 3, StorageName: "backup-2", Address: "tcp://backup-2"},
}
for _, repo := range []models.Repository{repo1, repo2} {
- actualSecondaries, err := mds.GetShardSecondaries(repo)
+ actualSecondaries, err := mds.GetSecondaries(repo.RelativePath)
require.NoError(t, err)
require.ElementsMatch(t, expectSecondaries, actualSecondaries)
}
- backup1 := cfg.SecondaryServers[0]
- backup2 := cfg.SecondaryServers[1]
+ backup1 := mds.nodeStorages.m[2]
+ backup2 := mds.nodeStorages.m[3]
- backup1ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
+ backup1ExpectedJobs := []ReplJob{
+ ReplJob{
ID: 1,
- Target: backup1.Name,
+ Target: backup1.StorageName,
Source: repo1,
- State: praefect.JobStateReady,
+ State: JobStatePending,
},
- praefect.ReplJob{
+ ReplJob{
ID: 3,
- Target: backup1.Name,
+ Target: backup1.StorageName,
Source: repo2,
- State: praefect.JobStateReady,
+ State: JobStatePending,
},
}
- backup2ExpectedJobs := []praefect.ReplJob{
- praefect.ReplJob{
+ backup2ExpectedJobs := []ReplJob{
+ ReplJob{
ID: 2,
- Target: backup2.Name,
+ Target: backup2.StorageName,
Source: repo1,
- State: praefect.JobStateReady,
+ State: JobStatePending,
},
- praefect.ReplJob{
+ ReplJob{
ID: 4,
- Target: backup2.Name,
+ Target: backup2.StorageName,
Source: repo2,
- State: praefect.JobStateReady,
+ State: JobStatePending,
},
}
- backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10)
+ backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.StorageName, 10)
require.NoError(t, err)
require.Equal(t, backup1ExpectedJobs, backup1ActualJobs)
- backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10)
+ backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.StorageName, 10)
require.NoError(t, err)
require.Equal(t, backup2ActualJobs, backup2ExpectedJobs)
diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go
index 417a04be2..5be41bb72 100644
--- a/internal/praefect/datastore_test.go
+++ b/internal/praefect/datastore_test.go
@@ -1,95 +1,104 @@
-package praefect_test
+package praefect
import (
"testing"
"github.com/stretchr/testify/require"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
)
-const (
- stor1 = "default" // usually the primary storage location
- stor2 = "backup-1" // usually the seoncary storage location
+var (
+ stor1 = models.StorageNode{
+ ID: 1,
+ Address: "tcp://address-1",
+ StorageName: "stor1",
+ }
+ stor2 = models.StorageNode{
+ ID: 2,
+ Address: "tcp://address-2",
+ StorageName: "backup-1",
+ } // usually the seoncary storage location
proj1 = "abcd1234" // imagine this is a legit project hash
)
var (
- repo1Primary = models.Repository{
+ repo1Shard = models.Shard{
RelativePath: proj1,
- Storage: stor1,
}
)
var operations = []struct {
desc string
- opFn func(*testing.T, praefect.Datastore)
+ opFn func(*testing.T, Datastore)
}{
{
desc: "query an empty datastore",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.StorageName, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
},
{
- desc: "insert first replication job before secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- _, err := ds.CreateSecondaryReplJobs(repo1Primary)
- require.Error(t, err, praefect.ErrInvalidReplTarget)
+ desc: "creating replication jobs before secondaries are added results in no jobs added",
+ opFn: func(t *testing.T, ds Datastore) {
+ jobIDs, err := ds.CreateSecondaryReplJobs(repo1Shard.RelativePath)
+ require.NoError(t, err)
+ require.Empty(t, jobIDs)
},
},
{
desc: "set the primary for the shard",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1})
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.SetPrimary(repo1Shard.RelativePath, stor1.ID)
require.NoError(t, err)
},
},
{
desc: "associate the replication job target with a primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}})
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.AddSecondary(repo1Shard.RelativePath, stor2.ID)
require.NoError(t, err)
},
},
{
desc: "insert first replication job after secondary mapped to primary",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- ids, err := ds.CreateSecondaryReplJobs(repo1Primary)
+ opFn: func(t *testing.T, ds Datastore) {
+ ids, err := ds.CreateSecondaryReplJobs(repo1Shard.RelativePath)
require.NoError(t, err)
require.Equal(t, []uint64{1}, ids)
},
},
{
desc: "fetch inserted replication jobs after primary mapped",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.StorageName, 10)
require.NoError(t, err)
require.Len(t, jobs, 1)
- expectedJob := praefect.ReplJob{
- ID: 1,
- Source: repo1Primary,
- Target: stor2,
- State: praefect.JobStatePending,
+ expectedJob := ReplJob{
+ ID: 1,
+ Source: models.Repository{
+ RelativePath: repo1Shard.RelativePath,
+ Storage: stor1.StorageName,
+ },
+ Target: stor2.StorageName,
+ State: JobStatePending,
}
require.Equal(t, expectedJob, jobs[0])
},
},
{
desc: "mark replication job done",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- err := ds.UpdateReplJob(1, praefect.JobStateComplete)
+ opFn: func(t *testing.T, ds Datastore) {
+ err := ds.UpdateReplJob(1, JobStateComplete)
require.NoError(t, err)
},
},
{
desc: "try fetching completed replication job",
- opFn: func(t *testing.T, ds praefect.Datastore) {
- jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1)
+ opFn: func(t *testing.T, ds Datastore) {
+ jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.StorageName, 1)
require.NoError(t, err)
require.Len(t, jobs, 0)
},
@@ -97,14 +106,15 @@ var operations = []struct {
}
// TODO: add SQL datastore flavor
-var flavors = map[string]func() praefect.Datastore{
- "in-memory-datastore": func() praefect.Datastore {
- return praefect.NewMemoryDatastore(
- config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- })
+var flavors = map[string]func() Datastore{
+ "in-memory-datastore": func() Datastore {
+ ds := NewMemoryDatastore()
+
+ ds.shards.m[repo1Shard.RelativePath] = repo1Shard
+ ds.nodeStorages.m[stor1.ID] = stor1
+ ds.nodeStorages.m[stor2.ID] = stor2
+
+ return ds
},
}
diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go
index b8a8afb01..f6d19f296 100644
--- a/internal/praefect/mock/mock.pb.go
+++ b/internal/praefect/mock/mock.pb.go
@@ -9,7 +9,10 @@ import (
math "math"
proto "github.com/golang/protobuf/proto"
+ _ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
)
// Reference imports to suppress errors if they are not otherwise used.
@@ -109,16 +112,17 @@ func init() {
func init() { proto.RegisterFile("mock/mock.proto", fileDescriptor_5ed43251284e3118) }
var fileDescriptor_5ed43251284e3118 = []byte{
- // 139 bytes of a gzipped FileDescriptorProto
+ // 157 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xcd, 0x4f, 0xce,
- 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x92, 0x2a, 0x17,
- 0x6f, 0x70, 0x66, 0x6e, 0x41, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x08,
- 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x84, 0xa3,
- 0xa4, 0xc6, 0xc5, 0x07, 0x53, 0x56, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0x50, 0xc7, 0x84, 0xa4,
- 0xce, 0x28, 0x00, 0x66, 0x5c, 0x70, 0x6a, 0x51, 0x59, 0x66, 0x72, 0xaa, 0x90, 0x3d, 0x97, 0x00,
- 0x44, 0x20, 0x34, 0x2f, 0xb1, 0xa8, 0x12, 0x4c, 0x08, 0x09, 0xeb, 0x81, 0x9d, 0x81, 0x62, 0xaf,
- 0x94, 0x08, 0xaa, 0x20, 0xc4, 0x16, 0x25, 0x86, 0x24, 0x36, 0xb0, 0x6b, 0x8d, 0x01, 0x01, 0x00,
- 0x00, 0xff, 0xff, 0xb7, 0xeb, 0x46, 0xfb, 0xc0, 0x00, 0x00, 0x00,
+ 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x14, 0x4f, 0x71,
+ 0x46, 0x62, 0x51, 0x6a, 0x0a, 0x44, 0x4c, 0x49, 0x95, 0x8b, 0x37, 0x38, 0x33, 0xb7, 0x20, 0x27,
+ 0x35, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34,
+ 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x35, 0x08, 0xc2, 0x51, 0x52, 0xe3, 0xe2, 0x83, 0x29, 0x2b,
+ 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x45, 0xa8, 0x63, 0x42, 0x52, 0x67, 0x14, 0x01, 0x33, 0x2e, 0x38,
+ 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0xc8, 0x9d, 0x4b, 0x00, 0x22, 0x10, 0x9a, 0x97, 0x58, 0x54,
+ 0x09, 0x26, 0x84, 0x84, 0xf5, 0xc0, 0x8e, 0x42, 0xb1, 0x57, 0x4a, 0x04, 0x55, 0x10, 0x62, 0x8b,
+ 0x12, 0xc7, 0xaf, 0xe9, 0x1a, 0x2c, 0x1c, 0x4c, 0x02, 0x8c, 0x49, 0x6c, 0x60, 0xf7, 0x1a, 0x03,
+ 0x02, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x14, 0x6a, 0x14, 0xd6, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -160,6 +164,14 @@ type SimpleServiceServer interface {
SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error)
}
+// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations.
+type UnimplementedSimpleServiceServer struct {
+}
+
+func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented")
+}
+
func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) {
s.RegisterService(&_SimpleService_serviceDesc, srv)
}
diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto
index aa6ec842a..59e79d3b9 100644
--- a/internal/praefect/mock/mock.proto
+++ b/internal/praefect/mock/mock.proto
@@ -7,6 +7,8 @@ syntax = "proto3";
package mock;
+import "shared.proto";
+
message SimpleRequest {
int32 value = 1;
}
@@ -17,7 +19,10 @@ message SimpleResponse {
service SimpleService {
// SimpleUnaryUnary is a simple unary request with unary response
rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) {
- option (gitaly.op_type).op = ACCESSOR;
+ option (gitaly.op_type) = {
+ op: ACCESSOR
+ scope_level: SERVER
+ };
}
}
diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go
index f6e01811b..adcf7a65e 100644
--- a/internal/praefect/mocksvc_test.go
+++ b/internal/praefect/mocksvc_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
@@ -11,7 +11,7 @@ type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock.
// mockSvc is an implementation of mock.SimpleServer for testing purposes. The
// gRPC stub can be updated via go generate:
//
-//go:generate protoc --go_out=plugins=grpc:. mock/mock.proto
+//go:generate protoc --go_out=plugins=grpc:. -I../../proto -I./ mock/mock.proto
//go:generate goimports -w mock/mock.pb.go
type mockSvc struct {
simpleUnaryUnary simpleUnaryUnaryCallback
diff --git a/internal/praefect/protoregistry/targetrepo_test.go b/internal/praefect/protoregistry/targetrepo_test.go
index f2c1f394e..7112d99c0 100644
--- a/internal/praefect/protoregistry/targetrepo_test.go
+++ b/internal/praefect/protoregistry/targetrepo_test.go
@@ -56,7 +56,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) {
svc: "RepositoryService",
method: "RepackIncremental",
pbMsg: &gitalypb.RepackIncrementalResponse{},
- expectErr: errors.New("proto message gitaly.RepackIncrementalResponse does not match expected RPC request message gitaly.RepackIncrementalRequest"),
+ expectErr: errors.New("unable to descend OID [1] into message gitaly.RepackIncrementalResponse: unable to find protobuf field 1 in message RepackIncrementalResponse"),
},
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index f50fc80ff..c5ea6d616 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -14,16 +14,16 @@ import (
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
- Replicate(ctx context.Context, source models.Repository, target Node) error
+ Replicate(ctx context.Context, source models.Repository, targetStorage string, target Node) error
}
type defaultReplicator struct {
log *logrus.Logger
}
-func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target Node) error {
repository := &gitalypb.Repository{
- StorageName: target.Storage,
+ StorageName: targetStorage,
RelativePath: source.RelativePath,
}
@@ -120,7 +120,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository
return nil
}
- id, err := r.replJobsDS.CreateSecondaryReplJobs(repo)
+ id, err := r.replJobsDS.CreateSecondaryReplJobs(repo.RelativePath)
if err != nil {
return err
}
@@ -142,58 +142,64 @@ const (
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
for {
- jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10)
+ nodes, err := r.replicasDS.GetNodeStorages()
if err != nil {
- return err
+ return nil
}
- if len(jobs) == 0 {
- r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval)
+ for _, node := range nodes {
+ jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.StorageName, 10)
+ if err != nil {
+ return err
+ }
- select {
- // TODO: exponential backoff when no queries are returned
- case <-time.After(jobFetchInterval):
- continue
+ if len(jobs) == 0 {
+ r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval)
- case <-ctx.Done():
- return ctx.Err()
+ select {
+ // TODO: exponential backoff when no queries are returned
+ case <-time.After(jobFetchInterval):
+ continue
+
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
- }
- r.log.Debugf("fetched replication jobs: %#v", jobs)
+ r.log.Debugf("fetched replication jobs: %#v", jobs)
- for _, job := range jobs {
- r.log.WithField(logWithReplJobID, job.ID).
- Infof("processing replication job %#v", job)
- node, err := r.coordinator.GetStorageNode(job.Target)
- r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err)
- if err != nil {
- return err
- }
+ for _, job := range jobs {
+ r.log.WithField(logWithReplJobID, job.ID).
+ Infof("processing replication job %#v", job)
+ node, err := r.coordinator.GetStorageNode(node.Address)
+ if err != nil {
+ return err
+ }
- if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
- return err
- }
+ r.log.WithField(logWithReplJobID, job.ID).WithField("storage", node).Info("got storage")
- nodeStorage, err := r.replicasDS.GetPrimary(job.Source.RelativePath)
- if err != nil {
- return err
- }
+ if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ return err
+ }
- ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, nodeStorage.Address, "")
- if err != nil {
- return err
- }
+ nodeStorage, err := r.replicasDS.GetPrimary(job.Source.RelativePath)
+ if err != nil {
+ return err
+ }
- if err := r.replicator.Replicate(ctx, job.Source, node); err != nil {
- r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
- return err
- }
+ ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, nodeStorage.Address, "")
+ if err != nil {
+ return err
+ }
- r.log.WithField(logWithReplJobID, job.ID).
- Info("completed replication")
- if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil {
- return err
+ if err := r.replicator.Replicate(ctx, job.Source, job.Target, node); err != nil {
+ r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating")
+ return err
+ }
+
+ if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ return err
+ }
}
}
}
diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go
index 1294bc989..a0b71d719 100644
--- a/internal/praefect/replicator_test.go
+++ b/internal/praefect/replicator_test.go
@@ -18,7 +18,6 @@ import (
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/helper"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
"gitlab.com/gitlab-org/gitaly/internal/rubyserver"
serverPkg "gitlab.com/gitlab-org/gitaly/internal/server"
@@ -28,43 +27,53 @@ import (
// TestReplicatorProcessJobs verifies that a replicator will schedule jobs for
// all whitelisted repos
func TestReplicatorProcessJobsWhitelist(t *testing.T) {
- var (
- cfg = config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- ListenAddr: "tcp://gitaly-primary.example.com",
- },
- SecondaryServers: []*models.GitalyServer{
- {
- Name: "backup1",
- ListenAddr: "tcp://gitaly-backup1.example.com",
- },
- {
- Name: "backup2",
- ListenAddr: "tcp://gitaly-backup2.example.com",
- },
- },
- Whitelist: []string{"abcd1234", "edfg5678"},
- }
- datastore = NewMemoryDatastore(cfg)
- coordinator = NewCoordinator(logrus.New(), datastore)
- resultsCh = make(chan result)
- replman = NewReplMgr(
- cfg.SecondaryServers[1].Name,
- logrus.New(),
- datastore,
- coordinator,
- WithWhitelist(cfg.Whitelist),
- WithReplicator(&mockReplicator{resultsCh}),
- )
+ datastore := NewMemoryDatastore()
+ datastore.nodeStorages.m[1] = models.StorageNode{
+ ID: 1,
+ StorageName: "default",
+ Address: "tcp://gitaly-primary.example.com",
+ }
+ datastore.nodeStorages.m[2] = models.StorageNode{
+ ID: 2,
+ StorageName: "backup1",
+ Address: "tcp://gitaly-backup1.example.com",
+ }
+ datastore.nodeStorages.m[3] = models.StorageNode{
+ ID: 3,
+ StorageName: "backup2",
+ Address: "tcp://gitaly-backup2.example.com",
+ }
+
+ datastore.shards.m["abcd1234"] = models.Shard{
+ RelativePath: "abcd1234",
+ Primary: datastore.nodeStorages.m[1],
+ Secondaries: []models.StorageNode{datastore.nodeStorages.m[2], datastore.nodeStorages.m[3]},
+ }
+ datastore.shards.m["edfg5678"] = models.Shard{
+ RelativePath: "edfg5678",
+ Primary: datastore.nodeStorages.m[1],
+ Secondaries: []models.StorageNode{datastore.nodeStorages.m[2], datastore.nodeStorages.m[3]},
+ }
+
+ for _, repo := range []string{"abcd1234", "edfg5678"} {
+ jobIDs, err := datastore.CreateSecondaryReplJobs(repo)
+ require.NoError(t, err)
+ require.Len(t, jobIDs, 2)
+ }
+
+ coordinator := NewCoordinator(logrus.New(), datastore)
+ resultsCh := make(chan result)
+ replman := NewReplMgr(
+ "default",
+ logrus.New(),
+ datastore,
+ datastore,
+ coordinator,
+ WithReplicator(&mockReplicator{resultsCh}),
)
- for _, node := range []*models.GitalyServer{
- cfg.PrimaryServer,
- cfg.SecondaryServers[0],
- cfg.SecondaryServers[1],
- } {
- err := coordinator.RegisterNode(node.Name, node.ListenAddr)
+ for _, node := range datastore.nodeStorages.m {
+ err := coordinator.RegisterNode(node.Address)
require.NoError(t, err)
}
@@ -80,12 +89,9 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) {
go func() {
// we expect one job per whitelisted repo with each backend server
- for i := 0; i < len(cfg.Whitelist); i++ {
+ for _, shard := range datastore.shards.m {
result := <-resultsCh
-
- assert.Contains(t, cfg.Whitelist, result.source.RelativePath)
- assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage)
- assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage)
+ assert.Equal(t, shard.Primary.StorageName, result.source.Storage)
}
cancel()
@@ -114,7 +120,7 @@ type mockReplicator struct {
resultsCh chan<- result
}
-func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error {
+func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target Node) error {
select {
case mr.resultsCh <- result{source, target}:
@@ -179,9 +185,10 @@ func TestReplicate(t *testing.T) {
require.NoError(t, replicator.Replicate(
ctx,
models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()},
+ backupStorageName,
Node{
cc: conn,
- Storage: backupStorageName,
+ Address: srvSocketPath,
}))
replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath))
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index 915d7281a..921de1ce5 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -1,4 +1,4 @@
-package praefect_test
+package praefect
import (
"context"
@@ -7,14 +7,14 @@ import (
"testing"
"time"
+ "github.com/golang/protobuf/proto"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/client"
- "gitlab.com/gitlab-org/gitaly/internal/praefect"
- "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"gitlab.com/gitlab-org/gitaly/internal/praefect/mock"
"gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
"google.golang.org/grpc"
)
@@ -44,6 +44,12 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
},
}
+ gz := proto.FileDescriptor("mock/mock.proto")
+ fd, err := protoregistry.ExtractFileDescriptor(gz)
+ if err != nil {
+ panic(err)
+ }
+
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
const (
@@ -51,19 +57,35 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
storageBackup = "backup"
)
- datastore := praefect.NewMemoryDatastore(config.Config{
- PrimaryServer: &models.GitalyServer{
- Name: "default",
- },
- })
- coordinator := praefect.NewCoordinator(logrus.New(), datastore)
- replmgr := praefect.NewReplMgr(
+ datastore := NewMemoryDatastore()
+ datastore.nodeStorages.m[1] = models.StorageNode{
+ ID: 1,
+ StorageName: storagePrimary,
+ }
+ datastore.nodeStorages.m[2] = models.StorageNode{
+ ID: 2,
+ StorageName: storageBackup,
+ }
+
+ coordinator := NewCoordinator(logrus.New(), datastore, fd)
+
+ for id, nodeStorage := range datastore.nodeStorages.m {
+ backend, cleanup := newMockDownstream(t, tt.callback)
+ defer cleanup() // clean up mock downstream server resources
+
+ coordinator.RegisterNode(backend)
+ nodeStorage.Address = backend
+ datastore.nodeStorages.m[id] = nodeStorage
+ }
+
+ replmgr := NewReplMgr(
storagePrimary,
logrus.New(),
datastore,
+ datastore,
coordinator,
)
- prf := praefect.NewServer(
+ prf := NewServer(
coordinator,
replmgr,
nil,
@@ -85,13 +107,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) {
defer cc.Close()
cli := mock.NewSimpleServiceClient(cc)
- for _, replica := range []string{storagePrimary, storageBackup} {
- backend, cleanup := newMockDownstream(t, tt.callback)
- defer cleanup() // clean up mock downstream server resources
-
- coordinator.RegisterNode(replica, backend)
- }
-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
diff --git a/internal/rubyserver/balancer/balancer_test.go b/internal/rubyserver/balancer/balancer_test.go
index e2087cdb1..4038bc815 100644
--- a/internal/rubyserver/balancer/balancer_test.go
+++ b/internal/rubyserver/balancer/balancer_test.go
@@ -216,8 +216,6 @@ func (tcc *testClientConn) ConfigUpdates() []string {
return tcc.configUpdates
}
-func (tcc *testClientConn) UpdateState(state resolver.State) {}
-
// configureBuilderTest reconfigures the global builder and pre-populates
// it with addresses. It returns the list of addresses it added.
func configureBuilderTest(numAddrs int) []string {