diff options
author | John Cai <jcai@gitlab.com> | 2019-07-20 21:19:00 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-07-21 20:33:42 +0300 |
commit | 28eb0eef4293aa4faefd184ea173f8ae77e447bc (patch) | |
tree | bb067fa216834d02b9a57c240ec7ac0c0ba54791 | |
parent | b120f5a81cb4b03d608ba0c10f41f0cd06d00779 (diff) |
Fixing tests and other refactors
Introducing the SQL data model led to some changes to the datastore
interface. That led to more refactors in the rest of the code.
-rw-r--r-- | cmd/praefect-migrate/main.go | 4 | ||||
-rw-r--r-- | cmd/praefect/main.go | 20 | ||||
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 5 | ||||
-rw-r--r-- | internal/praefect/common.go | 2 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 57 | ||||
-rw-r--r-- | internal/praefect/coordinator_test.go | 31 | ||||
-rw-r--r-- | internal/praefect/database/sql_datastore.go | 14 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 276 | ||||
-rw-r--r-- | internal/praefect/datastore_memory_test.go | 103 | ||||
-rw-r--r-- | internal/praefect/datastore_test.go | 90 | ||||
-rw-r--r-- | internal/praefect/mock/mock.pb.go | 30 | ||||
-rw-r--r-- | internal/praefect/mock/mock.proto | 7 | ||||
-rw-r--r-- | internal/praefect/mocksvc_test.go | 4 | ||||
-rw-r--r-- | internal/praefect/protoregistry/targetrepo_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 92 | ||||
-rw-r--r-- | internal/praefect/replicator_test.go | 93 | ||||
-rw-r--r-- | internal/praefect/server_test.go | 51 | ||||
-rw-r--r-- | internal/rubyserver/balancer/balancer_test.go | 2 |
19 files changed, 468 insertions, 416 deletions
diff --git a/cmd/praefect-migrate/main.go b/cmd/praefect-migrate/main.go index b200eb4b2..3bc507655 100644 --- a/cmd/praefect-migrate/main.go +++ b/cmd/praefect-migrate/main.go @@ -26,9 +26,9 @@ func main() { flag.Parse() db := pg.Connect(&pg.Options{ - User: os.Getenv("PRAEFECT_PG_USER"), + User: os.Getenv("PRAEFECT_PG_USER"), Password: os.Getenv("PRAEFECT_PG_PASSWORD"), - Addr: os.Getenv("PRAEFECT_PG_ADDRESS"), + Addr: os.Getenv("PRAEFECT_PG_ADDRESS"), Database: os.Getenv("PRAEFECT_PG_DATABASE"), }) diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index a931bbfdd..d78b6d191 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -106,8 +106,8 @@ func run(listeners []net.Listener, conf config.Config) error { var ( // top level server dependencies - datastore = praefect.NewMemoryDatastore(conf) - coordinator = praefect.NewCoordinator(logger, sqlDatastore, protoregistry.GitalyProtoFileDescriptors...) + datastore = praefect.NewMemoryDatastore() + coordinator = praefect.NewCoordinator(logger, datastore, protoregistry.GitalyProtoFileDescriptors...) repl = praefect.NewReplMgr("default", logger, sqlDatastore, datastore, coordinator, praefect.WithWhitelist(conf.Whitelist)) srv = praefect.NewServer(coordinator, repl, nil, logger) // signal related @@ -125,14 +125,20 @@ func run(listeners []net.Listener, conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nodeAddresses, err := sqlDatastore.GetNodes() + nodeStorages, err := sqlDatastore.GetNodeStorages() - for _, address := range nodeAddresses { - if err := coordinator.RegisterNode(address); err != nil { - return fmt.Errorf("failed to register %s: %s", address, err) + addresses := make(map[string]struct{}) + for _, nodeStorage := range nodeStorages { + if _, ok := addresses[nodeStorage.Address]; ok { + continue } + if err := coordinator.RegisterNode(nodeStorage.Address); err != nil { + return fmt.Errorf("failed to register %s: %s", nodeStorage.Address, err) + } + + addresses[nodeStorage.Address] = struct{}{} - logger.WithField("node_address", address).Info("registered gitaly node") + logger.WithField("node_address", nodeStorage.Address).Info("registered gitaly node") } go func() { serverErrors <- repl.ProcessBacklog(ctx) }() @@ -7,7 +7,6 @@ require ( github.com/getsentry/raven-go v0.1.2 github.com/go-pg/migrations/v7 v7.1.0 github.com/go-pg/pg/v9 v9.0.0-beta - github.com/gogo/protobuf v1.2.1 // indirect github.com/golang/protobuf v1.3.2 github.com/google/uuid v1.1.1 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 @@ -41,9 +41,8 @@ github.com/go-pg/pg/v9 v9.0.0-beta/go.mod h1:iVSTa1IJiCa0cN5cJJD5n0k3zYliVQC35Wq github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= -github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= @@ -79,7 +78,6 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM= github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= -github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -217,7 +215,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2 h1:z99zHgr7hKfrUcX/KsoJk5FJfjTceCKIp96+biqP4To= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/internal/praefect/common.go b/internal/praefect/common.go index 2df2a4823..ec65e2513 100644 --- a/internal/praefect/common.go +++ b/internal/praefect/common.go @@ -4,7 +4,7 @@ import "google.golang.org/grpc" // Node is a wrapper around the grpc client connection for a backend Gitaly node type Node struct { - Storage string + Address string cc *grpc.ClientConn } diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b8f3b9901..862b40416 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -3,6 +3,7 @@ package praefect import ( "context" "database/sql" + "errors" "fmt" "math/rand" "os" @@ -58,14 +59,14 @@ func (c *Coordinator) RegisterProtos(protos ...*descriptor.FileDescriptorProto) } // GetStorageNode returns the registered node for the given storage location -func (c *Coordinator) GetStorageNode(storage string) (Node, error) { - cc, ok := c.getConn(storage) +func (c *Coordinator) GetStorageNode(address string) (Node, error) { + cc, ok := c.getConn(address) if !ok { - return Node{}, fmt.Errorf("no node registered for storage location %q", storage) + return Node{}, fmt.Errorf("no node registered for storage location %q", address) } return Node{ - Storage: storage, + Address: address, cc: cc, }, nil } @@ -107,33 +108,43 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, targetRepo, err := targetRepoFromStream(ctx, c.registry, fullMethodName, peeker) if err != nil { - return nil, nil, err - } - - primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) - if err != nil { - if err != sql.ErrNoRows { + //TODO: remove this, This is only here to catch the cases where we cannot find the target repo. We need to add the capability + // to the protregistry to be able to read the scope, and if it's not a repository scope then we shouldn't try to find the target repo + nodeStorages, err := c.datastore.GetNodeStorages() + if err != nil { return nil, nil, err } + if len(nodeStorages) == 0 { + return nil, nil, errors.New("no node storages found") + } + primary = &nodeStorages[0] + } else { - // if there are no primaries for this repository, pick one - nodeStorages, err := c.datastore.GetNodesForStorage(targetRepo.GetStorageName()) + primary, err = c.datastore.GetPrimary(targetRepo.GetRelativePath()) if err != nil { - return nil, nil, err - } + if err != sql.ErrNoRows { + return nil, nil, err + } - if len(nodeStorages) == 0 { - return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) + // if there are no primaries for this repository, pick one + nodeStorages, err := c.datastore.GetNodesForStorage(targetRepo.GetStorageName()) + if err != nil { + return nil, nil, err + } - } - newPrimary := nodeStorages[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodeStorages))] + if len(nodeStorages) == 0 { + return nil, nil, fmt.Errorf("no nodes serve storage %s", targetRepo.GetStorageName()) - // set the primary - if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { - return nil, nil, err - } + } + newPrimary := nodeStorages[rand.New(rand.NewSource(time.Now().Unix())).Intn(len(nodeStorages))] + + // set the primary + if err = c.datastore.SetPrimary(targetRepo.GetRelativePath(), newPrimary.ID); err != nil { + return nil, nil, err + } - primary = &newPrimary + primary = &newPrimary + } } // We only need the primary node, as there's only one primary storage diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index 50045f8a0..0275c6048 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -5,9 +5,6 @@ import ( "testing" "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) var testLogger = logrus.New() @@ -17,31 +14,5 @@ func init() { } func TestSecondaryRotation(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{Name: "primary"}, - SecondaryServers: []*models.GitalyServer{&models.GitalyServer{Name: "secondary_1"}, &models.GitalyServer{Name: "secondary_2"}}, - Whitelist: []string{"/repoA", "/repoB"}, - } - d := NewMemoryDatastore(cfg) - c := NewCoordinator(testLogger, d) - - primary, err := d.GetDefaultPrimary() - require.NoError(t, err) - - require.NoError(t, c.rotateSecondaryToPrimary(primary)) - - primary, err = d.GetDefaultPrimary() - require.NoError(t, err) - require.Equal(t, *cfg.SecondaryServers[0], primary, "the first secondary should have gotten promoted to be primary") - - repositories, err := d.GetRepositoriesForPrimary(primary) - require.NoError(t, err) - - for _, repository := range repositories { - shardSecondaries, err := d.GetShardSecondaries(models.Repository{RelativePath: repository}) - require.NoError(t, err) - - require.Len(t, shardSecondaries, 2) - require.Equal(t, *cfg.SecondaryServers[1], shardSecondaries[0]) - } + t.Skip("secondary rotation will change with the new data model") } diff --git a/internal/praefect/database/sql_datastore.go b/internal/praefect/database/sql_datastore.go index d9b96eab0..ffac41b4d 100644 --- a/internal/praefect/database/sql_datastore.go +++ b/internal/praefect/database/sql_datastore.go @@ -71,25 +71,25 @@ func (sd *SQLDatastore) GetNodesForStorage(storageName string) ([]models.Storage return nodeStorages, nil } -func (sd *SQLDatastore) GetNodes() ([]string, error) { - var addresses []string +func (sd *SQLDatastore) GetNodeStorages() ([]models.StorageNode, error) { + var nodeStorages []models.StorageNode - rows, err := sd.db.Query("SELECT DISTINCT node_storages.address FROM node_storages") + rows, err := sd.db.Query("SELECT node_storages.id, node_storages.address,node_storages.storage_name FROM node_storages") if err != nil { return nil, err } for rows.Next() { - var address string - err = rows.Scan(&address) + var nodeStorage models.StorageNode + err = rows.Scan(&nodeStorage.ID, &nodeStorage.Address, &nodeStorage.StorageName) if err != nil { return nil, err } - addresses = append(addresses, address) + nodeStorages = append(nodeStorages, nodeStorage) } - return addresses, nil + return nodeStorages, nil } diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 798148ea6..2460c93fb 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -11,7 +11,6 @@ import ( "sort" "sync" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) @@ -69,11 +68,11 @@ type Datastore interface { // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { - GetNodesForStorage(storageName string) ([]models.StorageNode, error) + GetSecondaries(relativePath string) ([]models.StorageNode, error) - GetNodes() ([]string, error) + GetNodesForStorage(storageName string) ([]models.StorageNode, error) - GetDefaultPrimary() (*models.StorageNode, error) + GetNodeStorages() ([]models.StorageNode, error) GetPrimary(relativePath string) (*models.StorageNode, error) @@ -92,12 +91,12 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, node string, count int) ([]ReplJob, error) + GetJobs(flag JobState, storage string, count int) ([]ReplJob, error) // CreateSecondaryJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) + CreateSecondaryReplJobs(relativePath string) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error @@ -110,40 +109,40 @@ type shard struct { } type jobRecord struct { - relativePath string // project's relative path - targetNode string - state JobState + relativePath string // project's relative path + targetStorage string + state JobState } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation type MemoryDatastore struct { - replicas *struct { - sync.RWMutex - m map[string]shard // keyed by project's relative path - } - jobs *struct { sync.RWMutex next uint64 records map[uint64]jobRecord // all jobs indexed by ID } - primary *struct { + nodeStorages *struct { + sync.RWMutex + m map[int]models.StorageNode + } + + shards *struct { sync.RWMutex - server models.GitalyServer + m map[string]models.Shard } } // NewMemoryDatastore returns an initialized in-memory datastore -func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { - m := &MemoryDatastore{ - replicas: &struct { +func NewMemoryDatastore() *MemoryDatastore { + return &MemoryDatastore{ + nodeStorages: &struct { sync.RWMutex - m map[string]shard + m map[int]models.StorageNode }{ - m: map[string]shard{}, + m: map[int]models.StorageNode{}, }, jobs: &struct { sync.RWMutex @@ -153,106 +152,141 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, - primary: &struct { + shards: &struct { sync.RWMutex - server models.GitalyServer + m map[string]models.Shard }{ - server: models.GitalyServer{ - Name: cfg.PrimaryServer.Name, - ListenAddr: cfg.PrimaryServer.ListenAddr, - Token: cfg.PrimaryServer.Token, - }, + m: map[string]models.Shard{}, }, } +} + +func (md *MemoryDatastore) GetSecondaries(relativePath string) ([]models.StorageNode, error) { + md.shards.RLock() + md.nodeStorages.RLock() + defer md.nodeStorages.RUnlock() + defer md.shards.RUnlock() - secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers)) - for i, server := range cfg.SecondaryServers { - secondaryServers[i] = *server + shard, ok := md.shards.m[relativePath] + if !ok { + return nil, errors.New("shard not found") } - for _, repo := range cfg.Whitelist { - // store the configuration file specified shard - m.replicas.m[repo] = shard{ - primary: *cfg.PrimaryServer, - secondaries: secondaryServers, - } + return shard.Secondaries, nil +} - // initialize replication job queue to replicate all whitelisted repos - // to every secondary server - for _, secondary := range cfg.SecondaryServers { - m.jobs.next++ - m.jobs.records[m.jobs.next] = jobRecord{ - state: JobStateReady, - targetNode: secondary.Name, - relativePath: repo, - } +func (md *MemoryDatastore) GetNodesForStorage(storageName string) ([]models.StorageNode, error) { + md.nodeStorages.RLock() + defer md.nodeStorages.RUnlock() + + var nodes []models.StorageNode + for _, nodeStorage := range md.nodeStorages.m { + if nodeStorage.StorageName == storageName { + nodes = append(nodes, nodeStorage) } } - - return m + return nodes, nil } -// GetShardSecondaries will return the set of secondary storage locations for a -// given repository if they exist -func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) { - shard, _ := md.getShard(primary.RelativePath) +func (md *MemoryDatastore) GetNodeStorages() ([]models.StorageNode, error) { + md.nodeStorages.RLock() + defer md.nodeStorages.RUnlock() - return shard.secondaries, nil + var nodeStorages []models.StorageNode + for _, nodeStorage := range md.nodeStorages.m { + nodeStorages = append(nodeStorages, nodeStorage) + } + + return nodeStorages, nil } -// SetShardSecondaries will replace the set of replicas for a repository -func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + md.shards.RLock() + defer md.shards.RUnlock() + + shard, ok := md.shards.m[relativePath] + if !ok { + return nil, errors.New("shard not found") + } - shard := md.replicas.m[repo.RelativePath] - shard.secondaries = secondaries - md.replicas.m[repo.RelativePath] = shard + nodeStorage, ok := md.nodeStorages.m[shard.Primary.ID] + if !ok { + return nil, errors.New("node storage not found") + } + return &nodeStorage, nil - return nil } +func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error { + md.shards.Lock() + defer md.shards.Unlock() -// SetShardPrimary sets the primary for a repository -func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() + shard, ok := md.shards.m[relativePath] + if !ok { + return errors.New("shard not found") + } - shard := md.replicas.m[repo.RelativePath] - shard.primary = primary - md.replicas.m[repo.RelativePath] = shard + nodeStorage, ok := md.nodeStorages.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } + shard.Primary = nodeStorage + + md.shards.m[relativePath] = shard return nil } -// GetShardPrimary gets the primary for a repository -func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +func (md *MemoryDatastore) AddSecondary(relativePath string, storageNodeID int) error { + md.shards.Lock() + defer md.shards.Unlock() - shard := md.replicas.m[repo.RelativePath] - return shard.primary, nil + shard, ok := md.shards.m[relativePath] + if !ok { + return errors.New("shard not found") + } + + nodeStorage, ok := md.nodeStorages.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } + + shard.Secondaries = append(shard.Secondaries, nodeStorage) + + md.shards.m[relativePath] = shard + return nil } -// GetRepositoriesForPrimary gets all repositories -func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +func (md *MemoryDatastore) RemoveSecondary(relativePath string, storageNodeID int) error { + md.shards.Lock() + defer md.shards.Unlock() - repositories := make([]string, 0, len(md.replicas.m)) + shard, ok := md.shards.m[relativePath] + if !ok { + return errors.New("shard not found") + } - for repository := range md.replicas.m { - repositories = append(repositories, repository) + var secondaries []models.StorageNode + for _, secondary := range shard.Secondaries { + if secondary.ID != storageNodeID { + secondaries = append(secondaries, secondary) + } } - return repositories, nil + shard.Secondaries = secondaries + md.shards.m[relativePath] = shard + return nil } -func (md *MemoryDatastore) getShard(project string) (shard, bool) { - md.replicas.RLock() - replicas, ok := md.replicas.m[project] - md.replicas.RUnlock() +func (md *MemoryDatastore) GetShard(relativePath string) (*models.Shard, error) { + md.shards.Lock() + defer md.shards.Unlock() - return replicas, ok + shard, ok := md.shards.m[relativePath] + if !ok { + return nil, errors.New("shard not found") + } + + return &shard, nil } // ErrSecondariesMissing indicates the repository does not have any backup @@ -260,7 +294,7 @@ func (md *MemoryDatastore) getShard(project string) (shard, bool) { var ErrSecondariesMissing = errors.New("repository missing secondary replicas") // GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) { +func (md *MemoryDatastore) GetJobs(state JobState, targetStorage string, count int) ([]ReplJob, error) { md.jobs.RLock() defer md.jobs.RUnlock() @@ -268,7 +302,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.targetNode == storage { + if record.state&state != 0 && record.targetStorage == targetStorage { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -289,8 +323,8 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ // replJobFromRecord constructs a replication job from a record and by cross // referencing the current shard for the project being replicated func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { - shard, ok := md.getShard(record.relativePath) - if !ok { + shard, err := md.GetShard(record.relativePath) + if err != nil { return ReplJob{}, fmt.Errorf( "unable to find shard for project at relative path %q", record.relativePath, @@ -301,46 +335,45 @@ func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (Re ID: jobID, Source: models.Repository{ RelativePath: record.relativePath, - Storage: shard.primary.Name, + Storage: shard.Primary.StorageName, }, State: record.state, - Target: record.targetNode, + Target: record.targetStorage, }, nil } -// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because +// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because // it fails preconditions for being replicatable -var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication") +var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication") // CreateSecondaryReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) { +func (md *MemoryDatastore) CreateSecondaryReplJobs(relativePath string) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() - emptyRepo := models.Repository{} - if source == emptyRepo { + if relativePath == "" { return nil, errors.New("invalid source repository") } - shard, ok := md.getShard(source.RelativePath) - if !ok { + shard, err := md.GetShard(relativePath) + if err != nil { return nil, fmt.Errorf( "unable to find shard for project at relative path %q", - source.RelativePath, + relativePath, ) } var jobIDs []uint64 - for _, secondary := range shard.secondaries { + for _, secondary := range shard.Secondaries { nextID := uint64(len(md.jobs.records) + 1) md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - targetNode: secondary.Name, - state: JobStatePending, - relativePath: source.RelativePath, + targetStorage: secondary.StorageName, + state: JobStatePending, + relativePath: relativePath, } jobIDs = append(jobIDs, nextID) @@ -369,36 +402,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } - -// SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error { - md.primary.Lock() - defer md.primary.Unlock() - - md.primary.server = primary - - return nil -} - -// GetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) { - md.primary.RLock() - defer md.primary.RUnlock() - - primary := md.primary.server - if primary == (models.GitalyServer{}) { - return primary, ErrPrimaryNotSet - } - - return primary, nil -} - -// SetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error { - md.primary.RLock() - defer md.primary.RUnlock() - - md.primary.server = primary - - return nil -} diff --git a/internal/praefect/datastore_memory_test.go b/internal/praefect/datastore_memory_test.go index 6099a8328..09a5e2e46 100644 --- a/internal/praefect/datastore_memory_test.go +++ b/internal/praefect/datastore_memory_test.go @@ -1,11 +1,9 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) @@ -13,81 +11,98 @@ import ( // populate itself with the correct replication jobs and shards when initialized // with a configuration file specifying the shard and whitelisted repositories. func TestMemoryDatastoreWhitelist(t *testing.T) { - cfg := config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup-1", - }, - { - Name: "backup-2", - }, - }, - Whitelist: []string{"abcd1234", "5678efgh"}, - } + mds := NewMemoryDatastore() - mds := praefect.NewMemoryDatastore(cfg) + mds.nodeStorages.m[1] = models.StorageNode{ + ID: 1, + StorageName: "default", + Address: "tcp://default", + } + mds.nodeStorages.m[2] = models.StorageNode{ + ID: 2, + StorageName: "backup-1", + Address: "tcp://backup-1", + } + mds.nodeStorages.m[3] = models.StorageNode{ + ID: 3, + StorageName: "backup-2", + Address: "tcp://backup-2", + } + mds.shards.m["abcd1234"] = models.Shard{ + RelativePath: "abcd1234", + Primary: mds.nodeStorages.m[1], + Secondaries: []models.StorageNode{mds.nodeStorages.m[2], mds.nodeStorages.m[3]}, + } + mds.shards.m["5678efgh"] = models.Shard{ + RelativePath: "5678efgh", + Primary: mds.nodeStorages.m[1], + Secondaries: []models.StorageNode{mds.nodeStorages.m[2], mds.nodeStorages.m[3]}, + } repo1 := models.Repository{ - RelativePath: cfg.Whitelist[0], - Storage: cfg.PrimaryServer.Name, + RelativePath: "abcd1234", + Storage: "default", } repo2 := models.Repository{ - RelativePath: cfg.Whitelist[1], - Storage: cfg.PrimaryServer.Name, + RelativePath: "5678efgh", + Storage: "default", + } + + for _, repo := range []models.Repository{repo1, repo2} { + jobIDs, err := mds.CreateSecondaryReplJobs(repo.RelativePath) + require.NoError(t, err) + require.Len(t, jobIDs, 2) } - expectSecondaries := []models.GitalyServer{ - models.GitalyServer{Name: cfg.SecondaryServers[0].Name}, - models.GitalyServer{Name: cfg.SecondaryServers[1].Name}, + expectSecondaries := []models.StorageNode{ + models.StorageNode{ID: 2, StorageName: "backup-1", Address: "tcp://backup-1"}, + models.StorageNode{ID: 3, StorageName: "backup-2", Address: "tcp://backup-2"}, } for _, repo := range []models.Repository{repo1, repo2} { - actualSecondaries, err := mds.GetShardSecondaries(repo) + actualSecondaries, err := mds.GetSecondaries(repo.RelativePath) require.NoError(t, err) require.ElementsMatch(t, expectSecondaries, actualSecondaries) } - backup1 := cfg.SecondaryServers[0] - backup2 := cfg.SecondaryServers[1] + backup1 := mds.nodeStorages.m[2] + backup2 := mds.nodeStorages.m[3] - backup1ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ + backup1ExpectedJobs := []ReplJob{ + ReplJob{ ID: 1, - Target: backup1.Name, + Target: backup1.StorageName, Source: repo1, - State: praefect.JobStateReady, + State: JobStatePending, }, - praefect.ReplJob{ + ReplJob{ ID: 3, - Target: backup1.Name, + Target: backup1.StorageName, Source: repo2, - State: praefect.JobStateReady, + State: JobStatePending, }, } - backup2ExpectedJobs := []praefect.ReplJob{ - praefect.ReplJob{ + backup2ExpectedJobs := []ReplJob{ + ReplJob{ ID: 2, - Target: backup2.Name, + Target: backup2.StorageName, Source: repo1, - State: praefect.JobStateReady, + State: JobStatePending, }, - praefect.ReplJob{ + ReplJob{ ID: 4, - Target: backup2.Name, + Target: backup2.StorageName, Source: repo2, - State: praefect.JobStateReady, + State: JobStatePending, }, } - backup1ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup1.Name, 10) + backup1ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup1.StorageName, 10) require.NoError(t, err) require.Equal(t, backup1ExpectedJobs, backup1ActualJobs) - backup2ActualJobs, err := mds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, backup2.Name, 10) + backup2ActualJobs, err := mds.GetJobs(JobStatePending|JobStateReady, backup2.StorageName, 10) require.NoError(t, err) require.Equal(t, backup2ActualJobs, backup2ExpectedJobs) diff --git a/internal/praefect/datastore_test.go b/internal/praefect/datastore_test.go index 417a04be2..5be41bb72 100644 --- a/internal/praefect/datastore_test.go +++ b/internal/praefect/datastore_test.go @@ -1,95 +1,104 @@ -package praefect_test +package praefect import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/internal/praefect" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" ) -const ( - stor1 = "default" // usually the primary storage location - stor2 = "backup-1" // usually the seoncary storage location +var ( + stor1 = models.StorageNode{ + ID: 1, + Address: "tcp://address-1", + StorageName: "stor1", + } + stor2 = models.StorageNode{ + ID: 2, + Address: "tcp://address-2", + StorageName: "backup-1", + } // usually the seoncary storage location proj1 = "abcd1234" // imagine this is a legit project hash ) var ( - repo1Primary = models.Repository{ + repo1Shard = models.Shard{ RelativePath: proj1, - Storage: stor1, } ) var operations = []struct { desc string - opFn func(*testing.T, praefect.Datastore) + opFn func(*testing.T, Datastore) }{ { desc: "query an empty datastore", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.StorageName, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, }, { - desc: "insert first replication job before secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - _, err := ds.CreateSecondaryReplJobs(repo1Primary) - require.Error(t, err, praefect.ErrInvalidReplTarget) + desc: "creating replication jobs before secondaries are added results in no jobs added", + opFn: func(t *testing.T, ds Datastore) { + jobIDs, err := ds.CreateSecondaryReplJobs(repo1Shard.RelativePath) + require.NoError(t, err) + require.Empty(t, jobIDs) }, }, { desc: "set the primary for the shard", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardPrimary(repo1Primary, models.GitalyServer{Name: stor1}) + opFn: func(t *testing.T, ds Datastore) { + err := ds.SetPrimary(repo1Shard.RelativePath, stor1.ID) require.NoError(t, err) }, }, { desc: "associate the replication job target with a primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.SetShardSecondaries(repo1Primary, []models.GitalyServer{models.GitalyServer{Name: stor2}}) + opFn: func(t *testing.T, ds Datastore) { + err := ds.AddSecondary(repo1Shard.RelativePath, stor2.ID) require.NoError(t, err) }, }, { desc: "insert first replication job after secondary mapped to primary", - opFn: func(t *testing.T, ds praefect.Datastore) { - ids, err := ds.CreateSecondaryReplJobs(repo1Primary) + opFn: func(t *testing.T, ds Datastore) { + ids, err := ds.CreateSecondaryReplJobs(repo1Shard.RelativePath) require.NoError(t, err) require.Equal(t, []uint64{1}, ids) }, }, { desc: "fetch inserted replication jobs after primary mapped", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor2, 10) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor2.StorageName, 10) require.NoError(t, err) require.Len(t, jobs, 1) - expectedJob := praefect.ReplJob{ - ID: 1, - Source: repo1Primary, - Target: stor2, - State: praefect.JobStatePending, + expectedJob := ReplJob{ + ID: 1, + Source: models.Repository{ + RelativePath: repo1Shard.RelativePath, + Storage: stor1.StorageName, + }, + Target: stor2.StorageName, + State: JobStatePending, } require.Equal(t, expectedJob, jobs[0]) }, }, { desc: "mark replication job done", - opFn: func(t *testing.T, ds praefect.Datastore) { - err := ds.UpdateReplJob(1, praefect.JobStateComplete) + opFn: func(t *testing.T, ds Datastore) { + err := ds.UpdateReplJob(1, JobStateComplete) require.NoError(t, err) }, }, { desc: "try fetching completed replication job", - opFn: func(t *testing.T, ds praefect.Datastore) { - jobs, err := ds.GetJobs(praefect.JobStatePending|praefect.JobStateReady, stor1, 1) + opFn: func(t *testing.T, ds Datastore) { + jobs, err := ds.GetJobs(JobStatePending|JobStateReady, stor1.StorageName, 1) require.NoError(t, err) require.Len(t, jobs, 0) }, @@ -97,14 +106,15 @@ var operations = []struct { } // TODO: add SQL datastore flavor -var flavors = map[string]func() praefect.Datastore{ - "in-memory-datastore": func() praefect.Datastore { - return praefect.NewMemoryDatastore( - config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - }) +var flavors = map[string]func() Datastore{ + "in-memory-datastore": func() Datastore { + ds := NewMemoryDatastore() + + ds.shards.m[repo1Shard.RelativePath] = repo1Shard + ds.nodeStorages.m[stor1.ID] = stor1 + ds.nodeStorages.m[stor2.ID] = stor2 + + return ds }, } diff --git a/internal/praefect/mock/mock.pb.go b/internal/praefect/mock/mock.pb.go index b8a8afb01..f6d19f296 100644 --- a/internal/praefect/mock/mock.pb.go +++ b/internal/praefect/mock/mock.pb.go @@ -9,7 +9,10 @@ import ( math "math" proto "github.com/golang/protobuf/proto" + _ "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" ) // Reference imports to suppress errors if they are not otherwise used. @@ -109,16 +112,17 @@ func init() { func init() { proto.RegisterFile("mock/mock.proto", fileDescriptor_5ed43251284e3118) } var fileDescriptor_5ed43251284e3118 = []byte{ - // 139 bytes of a gzipped FileDescriptorProto + // 157 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0xcd, 0x4f, 0xce, - 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x92, 0x2a, 0x17, - 0x6f, 0x70, 0x66, 0x6e, 0x41, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0x08, - 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, 0xaa, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x6b, 0x10, 0x84, 0xa3, - 0xa4, 0xc6, 0xc5, 0x07, 0x53, 0x56, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0x50, 0xc7, 0x84, 0xa4, - 0xce, 0x28, 0x00, 0x66, 0x5c, 0x70, 0x6a, 0x51, 0x59, 0x66, 0x72, 0xaa, 0x90, 0x3d, 0x97, 0x00, - 0x44, 0x20, 0x34, 0x2f, 0xb1, 0xa8, 0x12, 0x4c, 0x08, 0x09, 0xeb, 0x81, 0x9d, 0x81, 0x62, 0xaf, - 0x94, 0x08, 0xaa, 0x20, 0xc4, 0x16, 0x25, 0x86, 0x24, 0x36, 0xb0, 0x6b, 0x8d, 0x01, 0x01, 0x00, - 0x00, 0xff, 0xff, 0xb7, 0xeb, 0x46, 0xfb, 0xc0, 0x00, 0x00, 0x00, + 0xd6, 0x07, 0x11, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x2c, 0x20, 0xb6, 0x14, 0x4f, 0x71, + 0x46, 0x62, 0x51, 0x6a, 0x0a, 0x44, 0x4c, 0x49, 0x95, 0x8b, 0x37, 0x38, 0x33, 0xb7, 0x20, 0x27, + 0x35, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, + 0x55, 0x82, 0x51, 0x81, 0x51, 0x83, 0x35, 0x08, 0xc2, 0x51, 0x52, 0xe3, 0xe2, 0x83, 0x29, 0x2b, + 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x45, 0xa8, 0x63, 0x42, 0x52, 0x67, 0x14, 0x01, 0x33, 0x2e, 0x38, + 0xb5, 0xa8, 0x2c, 0x33, 0x39, 0x55, 0xc8, 0x9d, 0x4b, 0x00, 0x22, 0x10, 0x9a, 0x97, 0x58, 0x54, + 0x09, 0x26, 0x84, 0x84, 0xf5, 0xc0, 0x8e, 0x42, 0xb1, 0x57, 0x4a, 0x04, 0x55, 0x10, 0x62, 0x8b, + 0x12, 0xc7, 0xaf, 0xe9, 0x1a, 0x2c, 0x1c, 0x4c, 0x02, 0x8c, 0x49, 0x6c, 0x60, 0xf7, 0x1a, 0x03, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xb5, 0x14, 0x6a, 0x14, 0xd6, 0x00, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -160,6 +164,14 @@ type SimpleServiceServer interface { SimpleUnaryUnary(context.Context, *SimpleRequest) (*SimpleResponse, error) } +// UnimplementedSimpleServiceServer can be embedded to have forward compatible implementations. +type UnimplementedSimpleServiceServer struct { +} + +func (*UnimplementedSimpleServiceServer) SimpleUnaryUnary(ctx context.Context, req *SimpleRequest) (*SimpleResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SimpleUnaryUnary not implemented") +} + func RegisterSimpleServiceServer(s *grpc.Server, srv SimpleServiceServer) { s.RegisterService(&_SimpleService_serviceDesc, srv) } diff --git a/internal/praefect/mock/mock.proto b/internal/praefect/mock/mock.proto index aa6ec842a..59e79d3b9 100644 --- a/internal/praefect/mock/mock.proto +++ b/internal/praefect/mock/mock.proto @@ -7,6 +7,8 @@ syntax = "proto3"; package mock; +import "shared.proto"; + message SimpleRequest { int32 value = 1; } @@ -17,7 +19,10 @@ message SimpleResponse { service SimpleService { // SimpleUnaryUnary is a simple unary request with unary response rpc SimpleUnaryUnary(SimpleRequest) returns (SimpleResponse) { - option (gitaly.op_type).op = ACCESSOR; + option (gitaly.op_type) = { + op: ACCESSOR + scope_level: SERVER + }; } } diff --git a/internal/praefect/mocksvc_test.go b/internal/praefect/mocksvc_test.go index f6e01811b..adcf7a65e 100644 --- a/internal/praefect/mocksvc_test.go +++ b/internal/praefect/mocksvc_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -11,7 +11,7 @@ type simpleUnaryUnaryCallback func(context.Context, *mock.SimpleRequest) (*mock. // mockSvc is an implementation of mock.SimpleServer for testing purposes. The // gRPC stub can be updated via go generate: // -//go:generate protoc --go_out=plugins=grpc:. mock/mock.proto +//go:generate protoc --go_out=plugins=grpc:. -I../../proto -I./ mock/mock.proto //go:generate goimports -w mock/mock.pb.go type mockSvc struct { simpleUnaryUnary simpleUnaryUnaryCallback diff --git a/internal/praefect/protoregistry/targetrepo_test.go b/internal/praefect/protoregistry/targetrepo_test.go index f2c1f394e..7112d99c0 100644 --- a/internal/praefect/protoregistry/targetrepo_test.go +++ b/internal/praefect/protoregistry/targetrepo_test.go @@ -56,7 +56,7 @@ func TestProtoRegistryTargetRepo(t *testing.T) { svc: "RepositoryService", method: "RepackIncremental", pbMsg: &gitalypb.RepackIncrementalResponse{}, - expectErr: errors.New("proto message gitaly.RepackIncrementalResponse does not match expected RPC request message gitaly.RepackIncrementalRequest"), + expectErr: errors.New("unable to descend OID [1] into message gitaly.RepackIncrementalResponse: unable to find protobuf field 1 in message RepackIncrementalResponse"), }, } diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index f50fc80ff..c5ea6d616 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -14,16 +14,16 @@ import ( // Replicator performs the actual replication logic between two nodes type Replicator interface { - Replicate(ctx context.Context, source models.Repository, target Node) error + Replicate(ctx context.Context, source models.Repository, targetStorage string, target Node) error } type defaultReplicator struct { log *logrus.Logger } -func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (dr defaultReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target Node) error { repository := &gitalypb.Repository{ - StorageName: target.Storage, + StorageName: targetStorage, RelativePath: source.RelativePath, } @@ -120,7 +120,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo models.Repository return nil } - id, err := r.replJobsDS.CreateSecondaryReplJobs(repo) + id, err := r.replJobsDS.CreateSecondaryReplJobs(repo.RelativePath) if err != nil { return err } @@ -142,58 +142,64 @@ const ( // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { for { - jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, r.targetNode, 10) + nodes, err := r.replicasDS.GetNodeStorages() if err != nil { - return err + return nil } - if len(jobs) == 0 { - r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval) + for _, node := range nodes { + jobs, err := r.replJobsDS.GetJobs(JobStatePending|JobStateReady, node.StorageName, 10) + if err != nil { + return err + } - select { - // TODO: exponential backoff when no queries are returned - case <-time.After(jobFetchInterval): - continue + if len(jobs) == 0 { + r.log.Tracef("no jobs for %s, checking again in %s", r.targetNode, jobFetchInterval) - case <-ctx.Done(): - return ctx.Err() + select { + // TODO: exponential backoff when no queries are returned + case <-time.After(jobFetchInterval): + continue + + case <-ctx.Done(): + return ctx.Err() + } } - } - r.log.Debugf("fetched replication jobs: %#v", jobs) + r.log.Debugf("fetched replication jobs: %#v", jobs) - for _, job := range jobs { - r.log.WithField(logWithReplJobID, job.ID). - Infof("processing replication job %#v", job) - node, err := r.coordinator.GetStorageNode(job.Target) - r.log.WithField(logWithReplJobID, job.ID).Infof("got storage node? %+v %v", node, err) - if err != nil { - return err - } + for _, job := range jobs { + r.log.WithField(logWithReplJobID, job.ID). + Infof("processing replication job %#v", job) + node, err := r.coordinator.GetStorageNode(node.Address) + if err != nil { + return err + } - if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil { - return err - } + r.log.WithField(logWithReplJobID, job.ID).WithField("storage", node).Info("got storage") - nodeStorage, err := r.replicasDS.GetPrimary(job.Source.RelativePath) - if err != nil { - return err - } + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + return err + } - ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, nodeStorage.Address, "") - if err != nil { - return err - } + nodeStorage, err := r.replicasDS.GetPrimary(job.Source.RelativePath) + if err != nil { + return err + } - if err := r.replicator.Replicate(ctx, job.Source, node); err != nil { - r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") - return err - } + ctx, err = helper.InjectGitalyServers(ctx, job.Source.Storage, nodeStorage.Address, "") + if err != nil { + return err + } - r.log.WithField(logWithReplJobID, job.ID). - Info("completed replication") - if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil { - return err + if err := r.replicator.Replicate(ctx, job.Source, job.Target, node); err != nil { + r.log.WithField(logWithReplJobID, job.ID).WithError(err).Error("error when replicating") + return err + } + + if err := r.replJobsDS.UpdateReplJob(job.ID, JobStateComplete); err != nil { + return err + } } } } diff --git a/internal/praefect/replicator_test.go b/internal/praefect/replicator_test.go index 1294bc989..a0b71d719 100644 --- a/internal/praefect/replicator_test.go +++ b/internal/praefect/replicator_test.go @@ -18,7 +18,6 @@ import ( gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitaly_config "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/helper" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" "gitlab.com/gitlab-org/gitaly/internal/rubyserver" serverPkg "gitlab.com/gitlab-org/gitaly/internal/server" @@ -28,43 +27,53 @@ import ( // TestReplicatorProcessJobs verifies that a replicator will schedule jobs for // all whitelisted repos func TestReplicatorProcessJobsWhitelist(t *testing.T) { - var ( - cfg = config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - ListenAddr: "tcp://gitaly-primary.example.com", - }, - SecondaryServers: []*models.GitalyServer{ - { - Name: "backup1", - ListenAddr: "tcp://gitaly-backup1.example.com", - }, - { - Name: "backup2", - ListenAddr: "tcp://gitaly-backup2.example.com", - }, - }, - Whitelist: []string{"abcd1234", "edfg5678"}, - } - datastore = NewMemoryDatastore(cfg) - coordinator = NewCoordinator(logrus.New(), datastore) - resultsCh = make(chan result) - replman = NewReplMgr( - cfg.SecondaryServers[1].Name, - logrus.New(), - datastore, - coordinator, - WithWhitelist(cfg.Whitelist), - WithReplicator(&mockReplicator{resultsCh}), - ) + datastore := NewMemoryDatastore() + datastore.nodeStorages.m[1] = models.StorageNode{ + ID: 1, + StorageName: "default", + Address: "tcp://gitaly-primary.example.com", + } + datastore.nodeStorages.m[2] = models.StorageNode{ + ID: 2, + StorageName: "backup1", + Address: "tcp://gitaly-backup1.example.com", + } + datastore.nodeStorages.m[3] = models.StorageNode{ + ID: 3, + StorageName: "backup2", + Address: "tcp://gitaly-backup2.example.com", + } + + datastore.shards.m["abcd1234"] = models.Shard{ + RelativePath: "abcd1234", + Primary: datastore.nodeStorages.m[1], + Secondaries: []models.StorageNode{datastore.nodeStorages.m[2], datastore.nodeStorages.m[3]}, + } + datastore.shards.m["edfg5678"] = models.Shard{ + RelativePath: "edfg5678", + Primary: datastore.nodeStorages.m[1], + Secondaries: []models.StorageNode{datastore.nodeStorages.m[2], datastore.nodeStorages.m[3]}, + } + + for _, repo := range []string{"abcd1234", "edfg5678"} { + jobIDs, err := datastore.CreateSecondaryReplJobs(repo) + require.NoError(t, err) + require.Len(t, jobIDs, 2) + } + + coordinator := NewCoordinator(logrus.New(), datastore) + resultsCh := make(chan result) + replman := NewReplMgr( + "default", + logrus.New(), + datastore, + datastore, + coordinator, + WithReplicator(&mockReplicator{resultsCh}), ) - for _, node := range []*models.GitalyServer{ - cfg.PrimaryServer, - cfg.SecondaryServers[0], - cfg.SecondaryServers[1], - } { - err := coordinator.RegisterNode(node.Name, node.ListenAddr) + for _, node := range datastore.nodeStorages.m { + err := coordinator.RegisterNode(node.Address) require.NoError(t, err) } @@ -80,12 +89,9 @@ func TestReplicatorProcessJobsWhitelist(t *testing.T) { go func() { // we expect one job per whitelisted repo with each backend server - for i := 0; i < len(cfg.Whitelist); i++ { + for _, shard := range datastore.shards.m { result := <-resultsCh - - assert.Contains(t, cfg.Whitelist, result.source.RelativePath) - assert.Equal(t, cfg.SecondaryServers[1].Name, result.target.Storage) - assert.Equal(t, cfg.PrimaryServer.Name, result.source.Storage) + assert.Equal(t, shard.Primary.StorageName, result.source.Storage) } cancel() @@ -114,7 +120,7 @@ type mockReplicator struct { resultsCh chan<- result } -func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, target Node) error { +func (mr *mockReplicator) Replicate(ctx context.Context, source models.Repository, targetStorage string, target Node) error { select { case mr.resultsCh <- result{source, target}: @@ -179,9 +185,10 @@ func TestReplicate(t *testing.T) { require.NoError(t, replicator.Replicate( ctx, models.Repository{Storage: "default", RelativePath: testRepo.GetRelativePath()}, + backupStorageName, Node{ cc: conn, - Storage: backupStorageName, + Address: srvSocketPath, })) replicatedPath := filepath.Join(backupDir, filepath.Base(testRepoPath)) diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go index 915d7281a..921de1ce5 100644 --- a/internal/praefect/server_test.go +++ b/internal/praefect/server_test.go @@ -1,4 +1,4 @@ -package praefect_test +package praefect import ( "context" @@ -7,14 +7,14 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" - "gitlab.com/gitlab-org/gitaly/internal/praefect" - "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "gitlab.com/gitlab-org/gitaly/internal/praefect/mock" "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" "google.golang.org/grpc" ) @@ -44,6 +44,12 @@ func TestServerSimpleUnaryUnary(t *testing.T) { }, } + gz := proto.FileDescriptor("mock/mock.proto") + fd, err := protoregistry.ExtractFileDescriptor(gz) + if err != nil { + panic(err) + } + for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { const ( @@ -51,19 +57,35 @@ func TestServerSimpleUnaryUnary(t *testing.T) { storageBackup = "backup" ) - datastore := praefect.NewMemoryDatastore(config.Config{ - PrimaryServer: &models.GitalyServer{ - Name: "default", - }, - }) - coordinator := praefect.NewCoordinator(logrus.New(), datastore) - replmgr := praefect.NewReplMgr( + datastore := NewMemoryDatastore() + datastore.nodeStorages.m[1] = models.StorageNode{ + ID: 1, + StorageName: storagePrimary, + } + datastore.nodeStorages.m[2] = models.StorageNode{ + ID: 2, + StorageName: storageBackup, + } + + coordinator := NewCoordinator(logrus.New(), datastore, fd) + + for id, nodeStorage := range datastore.nodeStorages.m { + backend, cleanup := newMockDownstream(t, tt.callback) + defer cleanup() // clean up mock downstream server resources + + coordinator.RegisterNode(backend) + nodeStorage.Address = backend + datastore.nodeStorages.m[id] = nodeStorage + } + + replmgr := NewReplMgr( storagePrimary, logrus.New(), datastore, + datastore, coordinator, ) - prf := praefect.NewServer( + prf := NewServer( coordinator, replmgr, nil, @@ -85,13 +107,6 @@ func TestServerSimpleUnaryUnary(t *testing.T) { defer cc.Close() cli := mock.NewSimpleServiceClient(cc) - for _, replica := range []string{storagePrimary, storageBackup} { - backend, cleanup := newMockDownstream(t, tt.callback) - defer cleanup() // clean up mock downstream server resources - - coordinator.RegisterNode(replica, backend) - } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() diff --git a/internal/rubyserver/balancer/balancer_test.go b/internal/rubyserver/balancer/balancer_test.go index e2087cdb1..4038bc815 100644 --- a/internal/rubyserver/balancer/balancer_test.go +++ b/internal/rubyserver/balancer/balancer_test.go @@ -216,8 +216,6 @@ func (tcc *testClientConn) ConfigUpdates() []string { return tcc.configUpdates } -func (tcc *testClientConn) UpdateState(state resolver.State) {} - // configureBuilderTest reconfigures the global builder and pre-populates // it with addresses. It returns the list of addresses it added. func configureBuilderTest(numAddrs int) []string { |