diff options
author | John Cai <jcai@gitlab.com> | 2019-06-29 01:50:08 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2019-06-29 03:35:13 +0300 |
commit | e9abcdf8e83096f1e0718c8ef80ab8b1024c77b5 (patch) | |
tree | 341a58ed906fc13fbf21fb289a963f9c9c58affe | |
parent | 8817418fbac65e4945bba32de83f5896530b6408 (diff) |
Add metadata to replication contextjc-add-metadata-to-replication
-rw-r--r-- | config.praefect.toml.example | 7 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 1 | ||||
-rw-r--r-- | internal/praefect/coordinator.go | 33 | ||||
-rw-r--r-- | internal/praefect/datastore.go | 27 | ||||
-rw-r--r-- | internal/praefect/replicator.go | 37 |
5 files changed, 79 insertions, 26 deletions
diff --git a/config.praefect.toml.example b/config.praefect.toml.example index 59e7563f1..1393de890 100644 --- a/config.praefect.toml.example +++ b/config.praefect.toml.example @@ -26,11 +26,14 @@ listen_addr = "127.0.0.1:2305" [primary_server] name = "default" listen_addr = "tcp://gitaly-primary.example.com" + token = "abcd1234" # [[secondary_server]] -# name = "default" +# name = "backup-1" # listen_addr = "tcp://gitaly-backup1.example.com" +# token = "abcd1234" # [[secondary_server]] -# name = "backup" +# name = "backup-2" # listen_addr = "tcp://gitaly-backup2.example.com" +# token = "abcd1234" diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 768104ed1..94215f5c2 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -28,6 +28,7 @@ type Config struct { type GitalyServer struct { Name string `toml:"name"` ListenAddr string `toml:"listen_addr" split_words:"true"` + Token string `toml:"token"` } // FromFile loads the config for the passed file path diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index b2e6704d5..92e2a75d9 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -2,12 +2,15 @@ package praefect import ( "context" + "encoding/base64" + "encoding/json" "fmt" "sync" gitalyauth "gitlab.com/gitlab-org/gitaly/auth" gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config" "gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry" + "gitlab.com/gitlab-org/gitaly/internal/storage" "github.com/golang/protobuf/protoc-gen-go/descriptor" "github.com/sirupsen/logrus" @@ -15,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -25,14 +29,14 @@ type Coordinator struct { log *logrus.Logger lock sync.RWMutex - datastore PrimaryDatastore + datastore ServerDatastore nodes map[string]*grpc.ClientConn registry *protoregistry.Registry } // NewCoordinator returns a new Coordinator that utilizes the provided logger -func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { +func NewCoordinator(l *logrus.Logger, datastore ServerDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator { registry := protoregistry.New() registry.RegisterFiles(fileDescriptors...) @@ -68,7 +72,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // to the appropriate Gitaly node. c.log.Debugf("Stream director received method %s", fullMethodName) - storageName, err := c.datastore.GetPrimary() + serverConfig, err := c.datastore.GetPrimary() if err != nil { err := status.Error( codes.FailedPrecondition, @@ -79,11 +83,30 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string, // We only need the primary node, as there's only one primary storage // location per praefect at this time - cc, ok := c.getConn(storageName) + cc, ok := c.getConn(serverConfig.Name) if !ok { - return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName) + return nil, nil, fmt.Errorf("unable to find existing client connection for %+v", serverConfig) } + gitalyServer, err := c.datastore.GetPrimary() + if err != nil { + return nil, nil, err + } + + gitalyServers := storage.GitalyServers{ + gitalyServer.Name: { + "address": gitalyServer.ListenAddr, + "token": gitalyServer.Token, + }, + } + + gitalyServersJSON, err := json.Marshal(gitalyServers) + if err != nil { + return nil, nil, err + } + + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))) + return ctx, cc, nil } diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go index eeb9f9728..8144a8d7b 100644 --- a/internal/praefect/datastore.go +++ b/internal/praefect/datastore.go @@ -69,9 +69,9 @@ type Datastore interface { // PrimaryDatastore manages accessing and setting the primary storage location type PrimaryDatastore interface { // GetPrimary gets the primary storage location - GetPrimary() (string, error) + GetPrimary() (config.GitalyServer, error) // SetPrimary sets the primary storage location - SetPrimary(primary string) error + SetPrimary(server config.GitalyServer) error } // ReplicasDatastore manages accessing and setting which secondary replicas @@ -132,7 +132,7 @@ type MemoryDatastore struct { primary *struct { sync.RWMutex - storageName string + server config.GitalyServer } } @@ -155,9 +155,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore { }, primary: &struct { sync.RWMutex - storageName string + server config.GitalyServer }{ - storageName: cfg.PrimaryServer.Name, + server: config.GitalyServer{ + Name: cfg.PrimaryServer.Name, + ListenAddr: cfg.PrimaryServer.ListenAddr, + }, }, } @@ -333,24 +336,24 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error } // SetPrimary sets the primary datastore location -func (md *MemoryDatastore) SetPrimary(primary string) error { +func (md *MemoryDatastore) SetPrimary(primary config.GitalyServer) error { md.primary.Lock() defer md.primary.Unlock() - md.primary.storageName = primary + md.primary.server = primary return nil } // GetPrimary gets the primary datastore location -func (md *MemoryDatastore) GetPrimary() (string, error) { +func (md *MemoryDatastore) GetPrimary() (config.GitalyServer, error) { md.primary.RLock() defer md.primary.RUnlock() - storageName := md.primary.storageName - if storageName == "" { - return "", ErrPrimaryNotSet + server := md.primary.server + if server == (config.GitalyServer{}) { + return config.GitalyServer{}, ErrPrimaryNotSet } - return storageName, nil + return server, nil } diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go index dce2df103..6c758f11f 100644 --- a/internal/praefect/replicator.go +++ b/internal/praefect/replicator.go @@ -2,10 +2,14 @@ package praefect import ( "context" + "encoding/base64" + "encoding/json" "fmt" "time" "gitlab.com/gitlab-org/gitaly-proto/go/gitalypb" + "gitlab.com/gitlab-org/gitaly/internal/storage" + "google.golang.org/grpc/metadata" "github.com/sirupsen/logrus" ) @@ -57,7 +61,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, ta // ReplMgr is a replication manager for handling replication jobs type ReplMgr struct { log *logrus.Logger - jobsStore ReplJobsDatastore + dataStore Datastore coordinator *Coordinator storage string // which replica is this replicator responsible for? replicator Replicator // does the actual replication logic @@ -71,10 +75,10 @@ type ReplMgrOpt func(*ReplMgr) // NewReplMgr initializes a replication manager with the provided dependencies // and options -func NewReplMgr(storage string, log *logrus.Logger, ds ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { +func NewReplMgr(storage string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr { r := ReplMgr{ log: log, - jobsStore: ds, + dataStore: ds, whitelist: map[string]struct{}{}, replicator: defaultReplicator{log}, storage: storage, @@ -115,7 +119,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error return nil } - id, err := r.jobsStore.CreateSecondaryReplJobs(repo) + id, err := r.dataStore.CreateSecondaryReplJobs(repo) if err != nil { return err } @@ -137,7 +141,7 @@ const ( // ProcessBacklog will process queued jobs. It will block while processing jobs. func (r ReplMgr) ProcessBacklog(ctx context.Context) error { for { - jobs, err := r.jobsStore.GetJobs(JobStatePending|JobStateReady, r.storage, 10) + jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.storage, 10) if err != nil { return err } @@ -165,17 +169,36 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error { return err } - if err := r.jobsStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { + if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil { return err } + gitalyServer, err := r.dataStore.GetPrimary() + if err != nil { + return err + } + + gitalyServers := storage.GitalyServers{ + gitalyServer.Name: { + "address": gitalyServer.ListenAddr, + "token": gitalyServer.Token, + }, + } + + gitalyServersJSON, err := json.Marshal(gitalyServers) + if err != nil { + return err + } + + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON))) + if err := r.replicator.Replicate(ctx, job.Source, node); err != nil { return err } r.log.WithField(logWithReplJobID, job.ID). Info("completed replication") - if err := r.jobsStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { + if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil { return err } } |