Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZeger-Jan van de Weg <git@zjvandeweg.nl>2019-02-18 15:29:36 +0300
committerZeger-Jan van de Weg <git@zjvandeweg.nl>2019-02-26 12:42:03 +0300
commit519dd1904509373c0ddfb2f33738b56c03fe4d72 (patch)
treeaf323f9b480160041a2d5cca05e31d2b52434a56
parent13736f51678be06902762e003401cf0d40140a6b (diff)
Only one storage location per praefect
Before the storage location was hard coded to "default" or "test". Now this is dynamic, based on the configuration. Each proxy server only serves one replica group until the moment we can map from repository_storage to a group of backend nodes.
-rw-r--r--changelogs/unreleased/zj-praefect.yml5
-rw-r--r--cmd/praefect/main.go7
-rw-r--r--internal/praefect/config/config.go12
-rw-r--r--internal/praefect/server.go54
-rw-r--r--internal/praefect/server_test.go23
5 files changed, 70 insertions, 31 deletions
diff --git a/changelogs/unreleased/zj-praefect.yml b/changelogs/unreleased/zj-praefect.yml
new file mode 100644
index 000000000..1960bd14b
--- /dev/null
+++ b/changelogs/unreleased/zj-praefect.yml
@@ -0,0 +1,5 @@
+---
+title: Create Praefect binary for proxy server execution
+merge_request: 1068
+author:
+type: added
diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go
index 602df0799..5e477bda9 100644
--- a/cmd/praefect/main.go
+++ b/cmd/praefect/main.go
@@ -13,6 +13,7 @@ import (
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/internal/praefect"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/labkit/tracing"
)
var (
@@ -38,6 +39,8 @@ func main() {
logger.Fatalf("%s", err)
}
+ tracing.Initialize(tracing.WithServiceName("praefect"))
+
l, err := net.Listen("tcp", conf.ListenAddr)
if err != nil {
logger.Fatalf("%s", err)
@@ -48,7 +51,7 @@ func main() {
logger.Fatalf("%v", run(l, conf))
}
-func run(l net.Listener, conf *config.Config) error {
+func run(l net.Listener, conf config.Config) error {
srv := praefect.NewServer(nil, logger)
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
@@ -71,7 +74,7 @@ func run(l net.Listener, conf *config.Config) error {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
if shutdownErr := srv.Shutdown(ctx); shutdownErr != nil {
- logger.Warn("error received during shutting down: %v", shutdownErr)
+ logger.Warnf("error received during shutting down: %v", shutdownErr)
}
err = fmt.Errorf("received signal: %v", s)
case err = <-serverErrors:
diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go
index 5c47c4f45..32a11294f 100644
--- a/internal/praefect/config/config.go
+++ b/internal/praefect/config/config.go
@@ -20,20 +20,20 @@ type GitalyServer struct {
}
// FromFile loads the config for the passed file path
-func FromFile(filePath string) (*Config, error) {
+func FromFile(filePath string) (Config, error) {
+ config := &Config{}
cfgFile, err := os.Open(filePath)
if err != nil {
- return nil, err
+ return *config, err
}
defer cfgFile.Close()
- config := &Config{}
_, err = toml.DecodeReader(cfgFile, config)
- return config, err
+ return *config, err
}
// Validate establishes if the config is valid
-func (c *Config) Validate() error {
+func (c Config) Validate() error {
if c.ListenAddr == "" {
return fmt.Errorf("no listen address configured")
}
@@ -45,7 +45,7 @@ func (c *Config) Validate() error {
listenAddrs := make(map[string]bool, len(c.GitalyServers))
for _, gitaly := range c.GitalyServers {
if gitaly.Name == "" {
- return fmt.Errorf("expect %q to have a name", gitaly)
+ return fmt.Errorf("expect %v to have a name", gitaly)
}
if _, found := listenAddrs[gitaly.ListenAddr]; found {
diff --git a/internal/praefect/server.go b/internal/praefect/server.go
index ae0a26a36..0561a4ec8 100644
--- a/internal/praefect/server.go
+++ b/internal/praefect/server.go
@@ -32,8 +32,13 @@ type Logger interface {
// downstream server. The coordinator is thread safe; concurrent calls to
// register nodes are safe.
type Coordinator struct {
- log Logger
- lock sync.RWMutex
+ log Logger
+ lock sync.RWMutex
+
+ // Nodes will in the first interations have only one key, which limits
+ // the praefect to serve only 1 distinct set of Gitaly nodes.
+ // One limitation this creates; each server needs the same amount of
+ // disk space in case of full replication.
nodes map[string]*grpc.ClientConn
}
@@ -51,18 +56,23 @@ func (c *Coordinator) streamDirector(ctx context.Context, fullMethodName string)
// to the appropriate Gitaly node.
c.log.Debugf("Stream director received method %s", fullMethodName)
- // TODO: obtain storage location dynamically from RPC request message
- // TODO fix hard coding
- storageLoc := "default"
-
c.lock.RLock()
- cc, ok := c.nodes[storageLoc]
+
+ var cc *grpc.ClientConn
+ storageLoc := ""
+ // We only need the first node, as there's only one storage location per
+ // praefect at this time
+ for k, v := range c.nodes {
+ storageLoc = k
+ cc = v
+ break
+ }
c.lock.RUnlock()
- if !ok {
+ if storageLoc == "" {
err := status.Error(
codes.FailedPrecondition,
- fmt.Sprintf("no downstream node for storage location %q", storageLoc),
+ "no downstream node registered",
)
return nil, nil, err
}
@@ -112,21 +122,25 @@ func NewServer(grpcOpts []grpc.ServerOption, l Logger) *Server {
// RegisterNode will direct traffic to the supplied downstream connection when the storage location
// is encountered.
-//
-// TODO: Coordinator probably needs to handle dialing, or another entity
-// needs to handle dialing to ensure keep alives and redialing logic
-// exist for when downstream connections are severed.
-func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) {
+func (c *Coordinator) RegisterNode(storageLoc, listenAddr string) error {
+ conn, err := client.Dial(listenAddr,
+ []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.CallCustomCodec(proxy.Codec()))},
+ )
+ if err != nil {
+ return err
+ }
+
c.lock.Lock()
+ defer c.lock.Unlock()
- conn, err := client.Dial(listenAddr, []grpc.DialOption{grpc.WithCodec(proxy.Codec())})
- if err != nil {
- c.log.Debugf("error registering: %v", err)
- } else {
- c.nodes[storageLoc] = conn
+ if _, ok := c.nodes[storageLoc]; !ok && len(c.nodes) > 0 {
+ conn.Close()
+ return fmt.Errorf("error: registering %s failed, only one storage location per server is supported", storageLoc)
}
- c.lock.Unlock()
+ c.nodes[storageLoc] = conn
+
+ return nil
}
func proxyRequiredOpts(director proxy.StreamDirector) []grpc.ServerOption {
diff --git a/internal/praefect/server_test.go b/internal/praefect/server_test.go
index a079bcb58..c71d275a2 100644
--- a/internal/praefect/server_test.go
+++ b/internal/praefect/server_test.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/mwitkow/grpc-proxy/proxy"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly-proto/go/gitalypb"
"gitlab.com/gitlab-org/gitaly/client"
@@ -49,6 +50,17 @@ func TestServerRouting(t *testing.T) {
require.NoError(t, <-errQ)
}
+func TestRegisteringSecondStorageLocation(t *testing.T) {
+ prf := praefect.NewServer(nil, testLogger{t})
+
+ mCli, _, cleanup := newMockDownstream(t)
+ defer cleanup() // clean up mock downstream server resources
+
+ assert.NoError(t, prf.RegisterNode("1", mCli))
+ assert.Error(t, prf.RegisterNode("2", mCli))
+
+}
+
func listenAvailPort(tb testing.TB) (net.Listener, int) {
listener, err := net.Listen("tcp", ":0")
require.NoError(tb, err)
@@ -85,7 +97,7 @@ func (tl testLogger) Debugf(format string, args ...interface{}) {
}
// initializes and returns a client to downstream server, downstream server, and cleanup function
-func newMockDownstream(tb testing.TB) (*grpc.ClientConn, gitalypb.RepositoryServiceServer, func()) {
+func newMockDownstream(tb testing.TB) (string, gitalypb.RepositoryServiceServer, func()) {
// setup mock server
m := &mockRepoSvc{
srv: grpc.NewServer(),
@@ -106,8 +118,13 @@ func newMockDownstream(tb testing.TB) (*grpc.ClientConn, gitalypb.RepositoryServ
m.srv.GracefulStop()
lis.Close()
cc.Close()
- require.NoError(tb, <-errQ)
+
+ // If the server is shutdown before Serve() is called on it
+ // the Serve() calls will return the ErrServerStopped
+ if err := <-errQ; err != nil && err != grpc.ErrServerStopped {
+ require.NoError(tb, err)
+ }
}
- return cc, m, cleanup
+ return fmt.Sprintf("tcp://localhost:%d", port), m, cleanup
}