Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2019-06-29 01:50:08 +0300
committerJohn Cai <jcai@gitlab.com>2019-06-29 03:35:13 +0300
commite9abcdf8e83096f1e0718c8ef80ab8b1024c77b5 (patch)
tree341a58ed906fc13fbf21fb289a963f9c9c58affe
parent8817418fbac65e4945bba32de83f5896530b6408 (diff)
Add metadata to replication contextjc-add-metadata-to-replication
-rw-r--r--config.praefect.toml.example7
-rw-r--r--internal/praefect/config/config.go1
-rw-r--r--internal/praefect/coordinator.go33
-rw-r--r--internal/praefect/datastore.go27
-rw-r--r--internal/praefect/replicator.go37
5 files changed, 79 insertions, 26 deletions
diff --git a/config.praefect.toml.example b/config.praefect.toml.example
index 59e7563f1..1393de890 100644
--- a/config.praefect.toml.example
+++ b/config.praefect.toml.example
@@ -26,11 +26,14 @@ listen_addr = "127.0.0.1:2305"
[primary_server]
name = "default"
listen_addr = "tcp://gitaly-primary.example.com"
+ token = "abcd1234"
# [[secondary_server]]
-# name = "default"
+# name = "backup-1"
# listen_addr = "tcp://gitaly-backup1.example.com"
+# token = "abcd1234"
# [[secondary_server]]
-# name = "backup"
+# name = "backup-2"
# listen_addr = "tcp://gitaly-backup2.example.com"
+# token = "abcd1234"
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 768104ed1..94215f5c2 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -28,6 +28,7 @@ type Config struct {
type GitalyServer struct {
Name string `toml:"name"`
ListenAddr string `toml:"listen_addr" split_words:"true"`
+ Token string `toml:"token"`
}
// FromFile loads the config for the passed file path
diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go
index b2e6704d5..92e2a75d9 100644
--- a/internal/praefect/coordinator.go
+++ b/internal/praefect/coordinator.go
@@ -2,12 +2,15 @@ package praefect
import (
"context"
+ "encoding/base64"
+ "encoding/json"
"fmt"
"sync"
gitalyauth "gitlab.com/gitlab-org/gitaly/auth"
gitalyconfig "gitlab.com/gitlab-org/gitaly/internal/config"
"gitlab.com/gitlab-org/gitaly/internal/praefect/protoregistry"
+ "gitlab.com/gitlab-org/gitaly/internal/storage"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/sirupsen/logrus"
@@ -15,6 +18,7 @@ import (
"gitlab.com/gitlab-org/gitaly/internal/praefect/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
+ "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@@ -25,14 +29,14 @@ type Coordinator struct {
log *logrus.Logger
lock sync.RWMutex
- datastore PrimaryDatastore
+ datastore ServerDatastore
nodes map[string]*grpc.ClientConn
registry *protoregistry.Registry
}
// NewCoordinator returns a new Coordinator that utilizes the provided logger
-func NewCoordinator(l *logrus.Logger, datastore PrimaryDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
+func NewCoordinator(l *logrus.Logger, datastore ServerDatastore, fileDescriptors ...*descriptor.FileDescriptorProto) *Coordinator {
registry := protoregistry.New()
registry.RegisterFiles(fileDescriptors...)
@@ -68,7 +72,7 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- storageName, err := c.datastore.GetPrimary()
+ serverConfig, err := c.datastore.GetPrimary()
if err != nil {
err := status.Error(
codes.FailedPrecondition,
@@ -79,11 +83,30 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string,
// We only need the primary node, as there's only one primary storage
// location per praefect at this time
- cc, ok := c.getConn(storageName)
+ cc, ok := c.getConn(serverConfig.Name)
if !ok {
- return nil, nil, fmt.Errorf("unable to find existing client connection for %s", storageName)
+ return nil, nil, fmt.Errorf("unable to find existing client connection for %+v", serverConfig)
}
+ gitalyServer, err := c.datastore.GetPrimary()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ gitalyServers := storage.GitalyServers{
+ gitalyServer.Name: {
+ "address": gitalyServer.ListenAddr,
+ "token": gitalyServer.Token,
+ },
+ }
+
+ gitalyServersJSON, err := json.Marshal(gitalyServers)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON)))
+
return ctx, cc, nil
}
diff --git a/internal/praefect/datastore.go b/internal/praefect/datastore.go
index eeb9f9728..8144a8d7b 100644
--- a/internal/praefect/datastore.go
+++ b/internal/praefect/datastore.go
@@ -69,9 +69,9 @@ type Datastore interface {
// PrimaryDatastore manages accessing and setting the primary storage location
type PrimaryDatastore interface {
// GetPrimary gets the primary storage location
- GetPrimary() (string, error)
+ GetPrimary() (config.GitalyServer, error)
// SetPrimary sets the primary storage location
- SetPrimary(primary string) error
+ SetPrimary(server config.GitalyServer) error
}
// ReplicasDatastore manages accessing and setting which secondary replicas
@@ -132,7 +132,7 @@ type MemoryDatastore struct {
primary *struct {
sync.RWMutex
- storageName string
+ server config.GitalyServer
}
}
@@ -155,9 +155,12 @@ func NewMemoryDatastore(cfg config.Config) *MemoryDatastore {
},
primary: &struct {
sync.RWMutex
- storageName string
+ server config.GitalyServer
}{
- storageName: cfg.PrimaryServer.Name,
+ server: config.GitalyServer{
+ Name: cfg.PrimaryServer.Name,
+ ListenAddr: cfg.PrimaryServer.ListenAddr,
+ },
},
}
@@ -333,24 +336,24 @@ func (md *MemoryDatastore) UpdateReplJob(jobID uint64, newState JobState) error
}
// SetPrimary sets the primary datastore location
-func (md *MemoryDatastore) SetPrimary(primary string) error {
+func (md *MemoryDatastore) SetPrimary(primary config.GitalyServer) error {
md.primary.Lock()
defer md.primary.Unlock()
- md.primary.storageName = primary
+ md.primary.server = primary
return nil
}
// GetPrimary gets the primary datastore location
-func (md *MemoryDatastore) GetPrimary() (string, error) {
+func (md *MemoryDatastore) GetPrimary() (config.GitalyServer, error) {
md.primary.RLock()
defer md.primary.RUnlock()
- storageName := md.primary.storageName
- if storageName == "" {
- return "", ErrPrimaryNotSet
+ server := md.primary.server
+ if server == (config.GitalyServer{}) {
+ return config.GitalyServer{}, ErrPrimaryNotSet
}
- return storageName, nil
+ return server, nil
}
diff --git a/internal/praefect/replicator.go b/internal/praefect/replicator.go
index dce2df103..6c758f11f 100644
--- a/internal/praefect/replicator.go
+++ b/internal/praefect/replicator.go
@@ -2,10 +2,14 @@ package praefect
import (
"context"
+ "encoding/base64"
+ "encoding/json"
"fmt"
"time"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
+ "gitlab.com/gitlab-org/gitaly/internal/storage"
+ "google.golang.org/grpc/metadata"
"github.com/sirupsen/logrus"
)
@@ -57,7 +61,7 @@ func (dr defaultReplicator) Replicate(ctx context.Context, source Repository, ta
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log *logrus.Logger
- jobsStore ReplJobsDatastore
+ dataStore Datastore
coordinator *Coordinator
storage string // which replica is this replicator responsible for?
replicator Replicator // does the actual replication logic
@@ -71,10 +75,10 @@ type ReplMgrOpt func(*ReplMgr)
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
-func NewReplMgr(storage string, log *logrus.Logger, ds ReplJobsDatastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
+func NewReplMgr(storage string, log *logrus.Logger, ds Datastore, c *Coordinator, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log,
- jobsStore: ds,
+ dataStore: ds,
whitelist: map[string]struct{}{},
replicator: defaultReplicator{log},
storage: storage,
@@ -115,7 +119,7 @@ func (r ReplMgr) ScheduleReplication(ctx context.Context, repo Repository) error
return nil
}
- id, err := r.jobsStore.CreateSecondaryReplJobs(repo)
+ id, err := r.dataStore.CreateSecondaryReplJobs(repo)
if err != nil {
return err
}
@@ -137,7 +141,7 @@ const (
// ProcessBacklog will process queued jobs. It will block while processing jobs.
func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
for {
- jobs, err := r.jobsStore.GetJobs(JobStatePending|JobStateReady, r.storage, 10)
+ jobs, err := r.dataStore.GetJobs(JobStatePending|JobStateReady, r.storage, 10)
if err != nil {
return err
}
@@ -165,17 +169,36 @@ func (r ReplMgr) ProcessBacklog(ctx context.Context) error {
return err
}
- if err := r.jobsStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
+ if err := r.dataStore.UpdateReplJob(job.ID, JobStateInProgress); err != nil {
return err
}
+ gitalyServer, err := r.dataStore.GetPrimary()
+ if err != nil {
+ return err
+ }
+
+ gitalyServers := storage.GitalyServers{
+ gitalyServer.Name: {
+ "address": gitalyServer.ListenAddr,
+ "token": gitalyServer.Token,
+ },
+ }
+
+ gitalyServersJSON, err := json.Marshal(gitalyServers)
+ if err != nil {
+ return err
+ }
+
+ ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("gitaly-servers", base64.StdEncoding.EncodeToString(gitalyServersJSON)))
+
if err := r.replicator.Replicate(ctx, job.Source, node); err != nil {
return err
}
r.log.WithField(logWithReplJobID, job.ID).
Info("completed replication")
- if err := r.jobsStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
+ if err := r.dataStore.UpdateReplJob(job.ID, JobStateComplete); err != nil {
return err
}
}