diff options
Diffstat (limited to 'internal/praefect/datastore.go')
-rw-r--r-- | internal/praefect/datastore.go | 365 |
1 files changed, 195 insertions, 170 deletions
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index 5678c6a24..85fa22a23 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -34,7 +34,7 @@ const ( // JobStateComplete indicates the job is now complete JobStateComplete // JobStateCancelled indicates the job was cancelled. This can occur if the - // job is no longer relevant (e.g. a node is moved out of a shard) + // job is no longer relevant (e.g. a node is moved out of a repository) JobStateCancelled ) @@ -42,10 +42,11 @@ const ( // meant for updating the repository so that it is synced with the primary // copy. Scheduled indicates when a replication job should be performed. type ReplJob struct { - ID uint64 // autoincrement ID - Target string // which storage location to replicate to? - Source models.Repository // source for replication - State JobState + ID uint64 // autoincrement ID + TargetNodeID int // which node to replicate to? + SourceStorage string + Source models.Repository // source for replication + State JobState } // replJobs provides sort manipulation behavior @@ -64,32 +65,26 @@ func (b byJobID) Less(i, j int) bool { return b.replJobs[i].ID < b.replJobs[j].I type Datastore interface { ReplJobsDatastore ReplicasDatastore - TemporaryDatastore -} - -// TemporaryDatastore contains methods that will go away once we move to a SQL datastore -type TemporaryDatastore interface { - GetDefaultPrimary() (models.GitalyServer, error) - SetDefaultPrimary(primary models.GitalyServer) error } // ReplicasDatastore manages accessing and setting which secondary replicas // backup a repository type ReplicasDatastore interface { - // GetSecondaries will retrieve all secondary replica storage locations for - // a primary replica - GetShardSecondaries(repo models.Repository) ([]models.GitalyServer, error) + GetReplicas(relativePath string) ([]models.StorageNode, error) + + GetStorageNode(nodeID int) (models.StorageNode, error) + + GetStorageNodes() ([]models.StorageNode, error) + + GetPrimary(relativePath string) (*models.StorageNode, error) - GetShardPrimary(repo models.Repository) (models.GitalyServer, error) + SetPrimary(relativePath string, storageNodeID int) error - // SetSecondaries will set the secondary storage locations for a repository - // in a primary replica. - SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error + AddReplica(relativePath string, storageNodeID int) error - SetShardPrimary(repo models.Repository, primary models.GitalyServer) error + RemoveReplica(relativePath string, storageNodeID int) error - // GetRepositoriesForPrimary returns a map of all of the active shards for a given primary - GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) + GetRepository(relativePath string) (*models.Repository, error) } // ReplJobsDatastore represents the behavior needed for fetching and updating @@ -98,58 +93,53 @@ type ReplJobsDatastore interface { // GetJobs fetches a list of chronologically ordered replication // jobs for the given storage replica. The returned list will be at most // count-length. - GetJobs(flag JobState, node string, count int) ([]ReplJob, error) + GetJobs(flag JobState, nodeID int, count int) ([]ReplJob, error) - // CreateSecondaryJobs will create replication jobs for each secondary + // CreateReplicaJobs will create replication jobs for each secondary // replica of a repository known to the datastore. A set of replication job // ID's for the created jobs will be returned upon success. - CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) + CreateReplicaReplJobs(relativePath string) ([]uint64, error) // UpdateReplJob updates the state of an existing replication job UpdateReplJob(jobID uint64, newState JobState) error } -// shard is a set of primary and secondary storage replicas for a project -type shard struct { - primary models.GitalyServer - secondaries []models.GitalyServer -} - type jobRecord struct { - relativePath string // project's relative path - targetNode string - state JobState + sourceStorage string + relativePath string // project's relative path + targetNodeID int + state JobState } // MemoryDatastore is a simple datastore that isn't persisted to disk. It is // only intended for early beta requirements and as a reference implementation // for the eventual SQL implementation type MemoryDatastore struct { - replicas *struct { - sync.RWMutex - m map[string]shard // keyed by project's relative path - } - jobs *struct { sync.RWMutex next uint64 records map[uint64]jobRecord // all jobs indexed by ID } - primary *struct { + storageNodes *struct { sync.RWMutex - server models.GitalyServer + m map[int]models.StorageNode + } + + repositories *struct { + sync.RWMutex + m map[string]models.Repository } } // NewMemoryDatastore returns an initialized in-memory datastore func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { m := &MemoryDatastore{ - replicas: &struct { + storageNodes: &struct { sync.RWMutex - m map[string]shard + m map[int]models.StorageNode }{ - m: map[string]shard{}, + m: map[int]models.StorageNode{}, }, jobs: &struct { sync.RWMutex @@ -159,114 +149,190 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { next: 0, records: map[uint64]jobRecord{}, }, - primary: &struct { + repositories: &struct { sync.RWMutex - server models.GitalyServer + m map[string]models.Repository }{ - server: models.GitalyServer{ - Name: cfg.PrimaryServer.Name, - ListenAddr: cfg.PrimaryServer.ListenAddr, - Token: cfg.PrimaryServer.Token, - }, + m: map[string]models.Repository{}, }, } - secondaryServers := make([]models.GitalyServer, len(cfg.SecondaryServers)) - for i, server := range cfg.SecondaryServers { - secondaryServers[i] = *server + for i, storageNode := range cfg.StorageNodes { + storageNode.ID = i + m.storageNodes.m[i] = *storageNode } - for _, repo := range cfg.Whitelist { - // store the configuration file specified shard - m.replicas.m[repo] = shard{ - primary: *cfg.PrimaryServer, - secondaries: secondaryServers, + for _, repoPath := range cfg.Whitelist { + repo := models.Repository{ + RelativePath: repoPath, } - - // initialize replication job queue to replicate all whitelisted repos - // to every secondary server - for _, secondary := range cfg.SecondaryServers { - m.jobs.next++ - m.jobs.records[m.jobs.next] = jobRecord{ - state: JobStateReady, - targetNode: secondary.Name, - relativePath: repo, + for storageID, storageNode := range cfg.StorageNodes { + + // By default, pick the first storage node to be the primary. We can change this later to pick a randomly selected node + // to be the primary + if repo.Primary == (models.StorageNode{}) { + repo.Primary = *storageNode + } else { + repo.Replicas = append(repo.Replicas, *storageNode) + // initialize replication job queue to replicate all whitelisted repos + // to every replica + m.jobs.next++ + m.jobs.records[m.jobs.next] = jobRecord{ + state: JobStateReady, + targetNodeID: storageID, + sourceStorage: repo.Primary.Storage, + relativePath: repoPath, + } } } + m.repositories.m[repoPath] = repo } return m } -// GetShardSecondaries will return the set of secondary storage locations for a -// given repository if they exist -func (md *MemoryDatastore) GetShardSecondaries(primary models.Repository) ([]models.GitalyServer, error) { - shard, _ := md.getShard(primary.RelativePath) +// GetReplicas gets the secondaries for a repository based on the relative path +func (md *MemoryDatastore) GetReplicas(relativePath string) ([]models.StorageNode, error) { + md.repositories.RLock() + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() + defer md.repositories.RUnlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("repository not found") + } + + return repository.Replicas, nil +} + +// GetStorageNode gets all storage nodes +func (md *MemoryDatastore) GetStorageNode(nodeID int) (models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() - return shard.secondaries, nil + node, ok := md.storageNodes.m[nodeID] + if !ok { + return models.StorageNode{}, errors.New("node not found") + } + + return node, nil } -// SetShardSecondaries will replace the set of replicas for a repository -func (md *MemoryDatastore) SetShardSecondaries(repo models.Repository, secondaries []models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// GetStorageNodes gets all storage nodes +func (md *MemoryDatastore) GetStorageNodes() ([]models.StorageNode, error) { + md.storageNodes.RLock() + defer md.storageNodes.RUnlock() - shard := md.replicas.m[repo.RelativePath] - shard.secondaries = secondaries - md.replicas.m[repo.RelativePath] = shard + var storageNodes []models.StorageNode + for _, storageNode := range md.storageNodes.m { + storageNodes = append(storageNodes, storageNode) + } - return nil + return storageNodes, nil } -// SetShardPrimary sets the primary for a repository -func (md *MemoryDatastore) SetShardPrimary(repo models.Repository, primary models.GitalyServer) error { - md.replicas.Lock() - defer md.replicas.Unlock() +// GetPrimary gets the primary storage node for a repository of a repository relative path +func (md *MemoryDatastore) GetPrimary(relativePath string) (*models.StorageNode, error) { + md.repositories.RLock() + defer md.repositories.RUnlock() - shard := md.replicas.m[repo.RelativePath] - shard.primary = primary - md.replicas.m[repo.RelativePath] = shard + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, ErrPrimaryNotSet + } + storageNode, ok := md.storageNodes.m[repository.Primary.ID] + if !ok { + return nil, errors.New("node storage not found") + } + return &storageNode, nil + +} + +// SetPrimary sets the primary storagee node for a repository of a repository relative path +func (md *MemoryDatastore) SetPrimary(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + repository = models.Repository{RelativePath: relativePath} + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } + + repository.Primary = storageNode + + md.repositories.m[relativePath] = repository return nil } -// GetShardPrimary gets the primary for a repository -func (md *MemoryDatastore) GetShardPrimary(repo models.Repository) (models.GitalyServer, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// AddReplica adds a secondary to a repository of a repository relative path +func (md *MemoryDatastore) AddReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() + + repository, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("repository not found") + } + + storageNode, ok := md.storageNodes.m[storageNodeID] + if !ok { + return errors.New("node storage not found") + } - shard := md.replicas.m[repo.RelativePath] - return shard.primary, nil + repository.Replicas = append(repository.Replicas, storageNode) + + md.repositories.m[relativePath] = repository + return nil } -// GetRepositoriesForPrimary gets all repositories -func (md *MemoryDatastore) GetRepositoriesForPrimary(primary models.GitalyServer) ([]string, error) { - md.replicas.Lock() - defer md.replicas.Unlock() +// RemoveReplica removes a secondary from a repository of a repository relative path +func (md *MemoryDatastore) RemoveReplica(relativePath string, storageNodeID int) error { + md.repositories.Lock() + defer md.repositories.Unlock() - repositories := make([]string, 0, len(md.replicas.m)) + repository, ok := md.repositories.m[relativePath] + if !ok { + return errors.New("repository not found") + } - for repository := range md.replicas.m { - repositories = append(repositories, repository) + var secondaries []models.StorageNode + for _, secondary := range repository.Replicas { + if secondary.ID != storageNodeID { + secondaries = append(secondaries, secondary) + } } - return repositories, nil + repository.Replicas = secondaries + md.repositories.m[relativePath] = repository + return nil } -func (md *MemoryDatastore) getShard(project string) (shard, bool) { - md.replicas.RLock() - replicas, ok := md.replicas.m[project] - md.replicas.RUnlock() +// GetRepository gets the repository for a repository relative path +func (md *MemoryDatastore) GetRepository(relativePath string) (*models.Repository, error) { + md.repositories.Lock() + defer md.repositories.Unlock() - return replicas, ok + repository, ok := md.repositories.m[relativePath] + if !ok { + return nil, errors.New("repository not found") + } + + return &repository, nil } -// ErrSecondariesMissing indicates the repository does not have any backup +// ErrReplicasMissing indicates the repository does not have any backup // replicas -var ErrSecondariesMissing = errors.New("repository missing secondary replicas") +var ErrReplicasMissing = errors.New("repository missing secondary replicas") // GetJobs is a more general method to retrieve jobs of a certain state from the datastore -func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([]ReplJob, error) { +func (md *MemoryDatastore) GetJobs(state JobState, targetNodeID int, count int) ([]ReplJob, error) { md.jobs.RLock() defer md.jobs.RUnlock() @@ -274,7 +340,7 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ for i, record := range md.jobs.records { // state is a bitmap that is a combination of one or more JobStates - if record.state&state != 0 && record.targetNode == storage { + if record.state&state != 0 && record.targetNodeID == targetNodeID { job, err := md.replJobFromRecord(i, record) if err != nil { return nil, err @@ -293,60 +359,52 @@ func (md *MemoryDatastore) GetJobs(state JobState, storage string, count int) ([ } // replJobFromRecord constructs a replication job from a record and by cross -// referencing the current shard for the project being replicated +// referencing the current repository for the project being replicated func (md *MemoryDatastore) replJobFromRecord(jobID uint64, record jobRecord) (ReplJob, error) { - shard, ok := md.getShard(record.relativePath) - if !ok { - return ReplJob{}, fmt.Errorf( - "unable to find shard for project at relative path %q", - record.relativePath, - ) - } - return ReplJob{ ID: jobID, Source: models.Repository{ RelativePath: record.relativePath, - Storage: shard.primary.Name, }, - State: record.state, - Target: record.targetNode, + SourceStorage: record.sourceStorage, + State: record.state, + TargetNodeID: record.targetNodeID, }, nil } -// ErrInvalidReplTarget indicates a targetNode repository cannot be chosen because +// ErrInvalidReplTarget indicates a targetStorage repository cannot be chosen because // it fails preconditions for being replicatable -var ErrInvalidReplTarget = errors.New("targetNode repository fails preconditions for replication") +var ErrInvalidReplTarget = errors.New("targetStorage repository fails preconditions for replication") -// CreateSecondaryReplJobs creates a replication job for each secondary that +// CreateReplicaReplJobs creates a replication job for each secondary that // backs the specified repository. Upon success, the job IDs will be returned. -func (md *MemoryDatastore) CreateSecondaryReplJobs(source models.Repository) ([]uint64, error) { +func (md *MemoryDatastore) CreateReplicaReplJobs(relativePath string) ([]uint64, error) { md.jobs.Lock() defer md.jobs.Unlock() - emptyRepo := models.Repository{} - if source == emptyRepo { + if relativePath == "" { return nil, errors.New("invalid source repository") } - shard, ok := md.getShard(source.RelativePath) - if !ok { + repository, err := md.GetRepository(relativePath) + if err != nil { return nil, fmt.Errorf( - "unable to find shard for project at relative path %q", - source.RelativePath, + "unable to find repository for project at relative path %q", + relativePath, ) } var jobIDs []uint64 - for _, secondary := range shard.secondaries { + for _, secondary := range repository.Replicas { nextID := uint64(len(md.jobs.records) + 1) md.jobs.next++ md.jobs.records[md.jobs.next] = jobRecord{ - targetNode: secondary.Name, - state: JobStatePending, - relativePath: source.RelativePath, + targetNodeID: secondary.ID, + state: JobStatePending, + relativePath: relativePath, + sourceStorage: repository.Primary.Storage, } jobIDs = append(jobIDs, nextID) @@ -375,36 +433,3 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error md.jobs.records[jobID] = job return nil } - -// SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary models.GitalyServer) error { - md.primary.Lock() - defer md.primary.Unlock() - - md.primary.server = primary - - return nil -} - -// GetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) GetDefaultPrimary() (models.GitalyServer, error) { - md.primary.RLock() - defer md.primary.RUnlock() - - primary := md.primary.server - if primary == (models.GitalyServer{}) { - return primary, ErrPrimaryNotSet - } - - return primary, nil -} - -// SetDefaultPrimary gets the primary datastore location -func (md *MemoryDatastore) SetDefaultPrimary(primary models.GitalyServer) error { - md.primary.RLock() - defer md.primary.RUnlock() - - md.primary.server = primary - - return nil -} |