Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'internal/praefect/datastore.go')
-rw-r--r--internal/praefect/datastore.go365
1 files changed, 195 insertions, 170 deletions
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index 5678c6a24..85fa22a23 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -34,7 +34,7 @@ const (
// JobStateComplete indicates the job is now complete
JobStateComplete
// JobStateCancelled indicates the job was cancelled. This can occur if the
- // job is no longer relevant (e.g. a node is moved out of a shard)
+ // job is no longer relevant (e.g. a node is moved out of a repository)
JobStateCancelled
)
@@ -42,10 +42,11 @@ const (
// meant for updating the repository so that it is synced with the primary
// copy. Scheduled indicates when a replication job should be performed.
type ReplJob struct {
- ID uint64 // autoincrement ID
- Target string // which storage location to replicate to?
- Source models.Repository // source for replication
- State JobState
+ ID uint64 // autoincrement ID
+ TargetNodeID int // which node to replicate to?
+ SourceStorage string
+ Source models.Repository // source for replication
+ State JobState
}
// replJobs provides sort manipulation behavior
@@ -64,32 +65,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I
type Datastore interface {
ReplJobsDatastore
ReplicasDatastore
- TemporaryDatastore
-}
-
-// TemporaryDatastore contains methods that will go away once we move to a SQL datastore
-type TemporaryDatastore interface {
- GetDefaultPrimary() (models.GitalyServer, error)
- SetDefaultPrimary(primary models.GitalyServer) error
}
// ReplicasDatastore manages accessing and setting which secondary replicas
// backup a repository
type ReplicasDatastore interface {
- // GetSecondaries will retrieve all secondary replica storage locations for
- // a primary replica
- GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error)
+ GetReplicas(relativePath string) ([]models.StorageNode, error)
+
+ GetStorageNode(nodeID int) (models.StorageNode, error)
+
+ GetStorageNodes() ([]models.StorageNode, error)
+
+ GetPrimary(relativePath string) (*models.StorageNode, error)
- GetShardPrimary(repo models.Repository) (models.GitalyServer, error)
+ SetPrimary(relativePath string, storageNodeID int) error
- // SetSecondaries will set the secondary storage locations for a repository
- // in a primary replica.
- SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error
+ AddReplica(relativePath string, storageNodeID int) error
- SetShardPrimary(repo models.Repository, primary models.GitalyServer) error
+ RemoveReplica(relativePath string, storageNodeID int) error
- // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary
- GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error)
+ GetRepository(relativePath string) (*models.Repository, error)
}
// ReplJobsDatastore represents the behavior needed for fetching and updating
@@ -98,58 +93,53 @@ type ReplJobsDatastore interface {
// GetJobs fetches a list of chronologically ordered replication
// jobs for the given storage replica. The returned list will be at most
// count-length.
- GetJobs(flag JobState, node string, count int) ([]ReplJob, error)
+ GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error)
- // CreateSecondaryJobs will create replication jobs for each secondary
+ // CreateReplicaJobs will create replication jobs for each secondary
// replica of a repository known to the datastore. A set of replication job
// ID's for the created jobs will be returned upon success.
- CreateSecondaryReplJobs(source models.Repository) ([]uint64, error)
+ CreateReplicaReplJobs(relativePath string) ([]uint64, error)
// UpdateReplJob updates the state of an existing replication job
UpdateReplJob(jobID uint64, newState JobState) error
}
-// shard is a set of primary and secondary storage replicas for a project
-type shard struct {
- primary models.GitalyServer
- secondaries []models.GitalyServer
-}
-
type jobRecord struct {
- relativePath string // project's relative path
- targetNode string
- state JobState
+ sourceStorage string
+ relativePath string // project's relative path
+ targetNodeID int
+ state JobState
}
// MemoryDatastore is a simple datastore that isn't persisted to disk. It is
// only intended for early beta requirements and as a reference implementation
// for the eventual SQL implementation
type MemoryDatastore struct {
- replicas *struct {
- sync.RWMutex
- m map[string]shard // keyed by project's relative path
- }
-
jobs *struct {
sync.RWMutex
next uint64
records map[uint64]jobRecord // all jobs indexed by ID
}
- primary *struct {
+ storageNodes *struct {
sync.RWMutex
- server models.GitalyServer
+ m map[int]models.StorageNode
+ }
+
+ repositories *struct {
+ sync.RWMutex
+ m map[string]models.Repository
}
}
// NewMemoryDatastore returns an initialized in-memory datastore
func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
m := &MemoryDatastore{
- replicas: &struct {
+ storageNodes: &struct {
sync.RWMutex
- m map[string]shard
+ m map[int]models.StorageNode
}{
- m: map[string]shard{},
+ m: map[int]models.StorageNode{},
},
jobs: &struct {
sync.RWMutex
@@ -159,114 +149,190 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
next: 0,
records: map[uint64]jobRecord{},
},
- primary: &struct {
+ repositories: &struct {
sync.RWMutex
- server models.GitalyServer
+ m map[string]models.Repository
}{
- server: models.GitalyServer{
- Name: cfg.PrimaryServer.Name,
- ListenAddr: cfg.PrimaryServer.ListenAddr,
- Token: cfg.PrimaryServer.Token,
- },
+ m: map[string]models.Repository{},
},
}
- secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers))
- for i, server := range cfg.SecondaryServers {
- secondaryServers[i] = *server
+ for i, storageNode := range cfg.StorageNodes {
+ storageNode.ID = i
+ m.storageNodes.m[i] = *storageNode
}
- for _, repo := range cfg.Whitelist {
- // store the configuration file specified shard
- m.replicas.m[repo] = shard{
- primary: *cfg.PrimaryServer,
- secondaries: secondaryServers,
+ for _, repoPath := range cfg.Whitelist {
+ repo := models.Repository{
+ RelativePath: repoPath,
}
-
- // initialize replication job queue to replicate all whitelisted repos
- // to every secondary server
- for _, secondary := range cfg.SecondaryServers {
- m.jobs.next++
- m.jobs.records[m.jobs.next] = jobRecord{
- state: JobStateReady,
- targetNode: secondary.Name,
- relativePath: repo,
+ for storageID, storageNode := range cfg.StorageNodes {
+
+ // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node
+ // to be the primary
+ if repo.Primary == (models.StorageNode{}) {
+ repo.Primary = *storageNode
+ } else {
+ repo.Replicas = append(repo.Replicas, *storageNode)
+ // initialize replication job queue to replicate all whitelisted repos
+ // to every replica
+ m.jobs.next++
+ m.jobs.records[m.jobs.next] = jobRecord{
+ state: JobStateReady,
+ targetNodeID: storageID,
+ sourceStorage: repo.Primary.Storage,
+ relativePath: repoPath,
+ }
}
}
+ m.repositories.m[repoPath] = repo
}
return m
}
-// GetShardSecondaries will return the set of secondary storage locations for a
-// given repository if they exist
-func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) {
- shard, _ := md.getShard(primary.RelativePath)
+// GetReplicas gets the secondaries for a repository based on the relative path
+func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) {
+ md.repositories.RLock()
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
+ defer md.repositories.RUnlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, errors.New("repository not found")
+ }
+
+ return repository.Replicas, nil
+}
+
+// GetStorageNode gets all storage nodes
+func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
- return shard.secondaries, nil
+ node, ok := md.storageNodes.m[nodeID]
+ if !ok {
+ return models.StorageNode{}, errors.New("node not found")
+ }
+
+ return node, nil
}
-// SetShardSecondaries will replace the set of replicas for a repository
-func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// GetStorageNodes gets all storage nodes
+func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) {
+ md.storageNodes.RLock()
+ defer md.storageNodes.RUnlock()
- shard := md.replicas.m[repo.RelativePath]
- shard.secondaries = secondaries
- md.replicas.m[repo.RelativePath] = shard
+ var storageNodes []models.StorageNode
+ for _, storageNode := range md.storageNodes.m {
+ storageNodes = append(storageNodes, storageNode)
+ }
- return nil
+ return storageNodes, nil
}
-// SetShardPrimary sets the primary for a repository
-func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// GetPrimary gets the primary storage node for a repository of a repository relative path
+func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) {
+ md.repositories.RLock()
+ defer md.repositories.RUnlock()
- shard := md.replicas.m[repo.RelativePath]
- shard.primary = primary
- md.replicas.m[repo.RelativePath] = shard
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, ErrPrimaryNotSet
+ }
+ storageNode, ok := md.storageNodes.m[repository.Primary.ID]
+ if !ok {
+ return nil, errors.New("node storage not found")
+ }
+ return &storageNode, nil
+
+}
+
+// SetPrimary sets the primary storagee node for a repository of a repository relative path
+func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ repository = models.Repository{RelativePath: relativePath}
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
+
+ repository.Primary = storageNode
+
+ md.repositories.m[relativePath] = repository
return nil
}
-// GetShardPrimary gets the primary for a repository
-func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// AddReplica adds a secondary to a repository of a repository relative path
+func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
+
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return errors.New("repository not found")
+ }
+
+ storageNode, ok := md.storageNodes.m[storageNodeID]
+ if !ok {
+ return errors.New("node storage not found")
+ }
- shard := md.replicas.m[repo.RelativePath]
- return shard.primary, nil
+ repository.Replicas = append(repository.Replicas, storageNode)
+
+ md.repositories.m[relativePath] = repository
+ return nil
}
-// GetRepositoriesForPrimary gets all repositories
-func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) {
- md.replicas.Lock()
- defer md.replicas.Unlock()
+// RemoveReplica removes a secondary from a repository of a repository relative path
+func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- repositories := make([]string, 0, len(md.replicas.m))
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return errors.New("repository not found")
+ }
- for repository := range md.replicas.m {
- repositories = append(repositories, repository)
+ var secondaries []models.StorageNode
+ for _, secondary := range repository.Replicas {
+ if secondary.ID != storageNodeID {
+ secondaries = append(secondaries, secondary)
+ }
}
- return repositories, nil
+ repository.Replicas = secondaries
+ md.repositories.m[relativePath] = repository
+ return nil
}
-func (md *MemoryDatastore) getShard(project string) (shard, bool) {
- md.replicas.RLock()
- replicas, ok := md.replicas.m[project]
- md.replicas.RUnlock()
+// GetRepository gets the repository for a repository relative path
+func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) {
+ md.repositories.Lock()
+ defer md.repositories.Unlock()
- return replicas, ok
+ repository, ok := md.repositories.m[relativePath]
+ if !ok {
+ return nil, errors.New("repository not found")
+ }
+
+ return &repository, nil
}
-// ErrSecondariesMissing indicates the repository does not have any backup
+// ErrReplicasMissing indicates the repository does not have any backup
// replicas
-var ErrSecondariesMissing = errors.New("repository missing secondary replicas")
+var ErrReplicasMissing = errors.New("repository missing secondary replicas")
// GetJobs is a more general method to retrieve jobs of a certain state from the datastore
-func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) {
+func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) {
md.jobs.RLock()
defer md.jobs.RUnlock()
@@ -274,7 +340,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
for i, record := range md.jobs.records {
// state is a bitmap that is a combination of one or more JobStates
- if record.state&state != 0 && record.targetNode == storage {
+ if record.state&state != 0 && record.targetNodeID == targetNodeID {
job, err := md.replJobFromRecord(i, record)
if err != nil {
return nil, err
@@ -293,60 +359,52 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([
}
// replJobFromRecord constructs a replication job from a record and by cross
-// referencing the current shard for the project being replicated
+// referencing the current repository for the project being replicated
func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) {
- shard, ok := md.getShard(record.relativePath)
- if !ok {
- return ReplJob{}, fmt.Errorf(
- "unable to find shard for project at relative path %q",
- record.relativePath,
- )
- }
-
return ReplJob{
ID: jobID,
Source: models.Repository{
RelativePath: record.relativePath,
- Storage: shard.primary.Name,
},
- State: record.state,
- Target: record.targetNode,
+ SourceStorage: record.sourceStorage,
+ State: record.state,
+ TargetNodeID: record.targetNodeID,
}, nil
}
-// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because
+// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because
// it fails preconditions for being replicatable
-var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication")
+var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication")
-// CreateSecondaryReplJobs creates a replication job for each secondary that
+// CreateReplicaReplJobs creates a replication job for each secondary that
// backs the specified repository. Upon success, the job IDs will be returned.
-func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) {
+func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) {
md.jobs.Lock()
defer md.jobs.Unlock()
- emptyRepo := models.Repository{}
- if source == emptyRepo {
+ if relativePath == "" {
return nil, errors.New("invalid source repository")
}
- shard, ok := md.getShard(source.RelativePath)
- if !ok {
+ repository, err := md.GetRepository(relativePath)
+ if err != nil {
return nil, fmt.Errorf(
- "unable to find shard for project at relative path %q",
- source.RelativePath,
+ "unable to find repository for project at relative path %q",
+ relativePath,
)
}
var jobIDs []uint64
- for _, secondary := range shard.secondaries {
+ for _, secondary := range repository.Replicas {
nextID := uint64(len(md.jobs.records) + 1)
md.jobs.next++
md.jobs.records[md.jobs.next] = jobRecord{
- targetNode: secondary.Name,
- state: JobStatePending,
- relativePath: source.RelativePath,
+ targetNodeID: secondary.ID,
+ state: JobStatePending,
+ relativePath: relativePath,
+ sourceStorage: repository.Primary.Storage,
}
jobIDs = append(jobIDs, nextID)
@@ -375,36 +433,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
md.jobs.records[jobID] = job
return nil
}
-
-// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error {
- md.primary.Lock()
- defer md.primary.Unlock()
-
- md.primary.server = primary
-
- return nil
-}
-
-// GetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- primary := md.primary.server
- if primary == (models.GitalyServer{}) {
- return primary, ErrPrimaryNotSet
- }
-
- return primary, nil
-}
-
-// SetDefaultPrimary gets the primary datastore location
-func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error {
- md.primary.RLock()
- defer md.primary.RUnlock()
-
- md.primary.server = primary
-
- return nil
-}