Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-11-08 19:17:46 +0300
committerJohn Cai <jcai@gitlab.com>2021-11-08 19:41:07 +0300
commit15199ee173407195aec00e47fff7e748dcba8cc0 (patch)
treecb71efc265c4dc897680a5af29ff6346dfd5834b
parenta49570e725af9ece4f04dcb89760352af9f39b68 (diff)
praefect: refactor node ping into praefect package
The node ping code can be reused. This commit moves it into the praefect internal package. This change paves the way for a praefect startup check to use the logic to check the health of internal gitaly nodes.
-rw-r--r--cmd/praefect/subcmd.go2
-rw-r--r--cmd/praefect/subcmd_dial_nodes.go209
-rw-r--r--cmd/praefect/subcmd_dial_nodes_test.go67
-rw-r--r--internal/praefect/nodes/ping.go261
-rw-r--r--internal/praefect/nodes/ping_test.go53
5 files changed, 328 insertions, 264 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index 2f09c7587..fa371cf7c 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -28,7 +28,7 @@ const defaultDialTimeout = 30 * time.Second
var subcommands = map[string]subcmd{
sqlPingCmdName: &sqlPingSubcommand{},
sqlMigrateCmdName: &sqlMigrateSubcommand{},
- dialNodesCmdName: &dialNodesSubcommand{},
+ dialNodesCmdName: newDialNodesSubcommand(logger),
sqlMigrateDownCmdName: &sqlMigrateDownSubcommand{},
sqlMigrateStatusCmdName: &sqlMigrateStatusSubcommand{},
datalossCmdName: newDatalossSubcommand(),
diff --git a/cmd/praefect/subcmd_dial_nodes.go b/cmd/praefect/subcmd_dial_nodes.go
index 0b22a0555..215a5496e 100644
--- a/cmd/praefect/subcmd_dial_nodes.go
+++ b/cmd/praefect/subcmd_dial_nodes.go
@@ -2,216 +2,27 @@ package main
import (
"context"
- "errors"
"flag"
- "fmt"
- "log"
- "sync"
- "time"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/health/grpc_health_v1"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
)
const dialNodesCmdName = "dial-nodes"
-type dialNodesSubcommand struct{}
-
-func (s *dialNodesSubcommand) FlagSet() *flag.FlagSet {
- return flag.NewFlagSet(dialNodesCmdName, flag.ExitOnError)
-}
-
-func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) error {
- nodes := flattenNodes(conf)
-
- var wg sync.WaitGroup
- for _, n := range nodes {
- wg.Add(1)
- go func(n *nodePing) {
- defer wg.Done()
- n.checkNode()
- }(n)
- }
- wg.Wait()
-
- var err error
- for _, n := range nodes {
- if n.err != nil {
- err = n.err
- }
- }
-
- return err
+func newDialNodesSubcommand(p nodes.Printer) *dialNodesSubcommand {
+ return &dialNodesSubcommand{p}
}
-type (
- virtualStorage string
- gitalyStorage string
-)
-
-type nodePing struct {
- address string
- // set of storages this node hosts
- storages map[gitalyStorage][]virtualStorage
- vStorages map[virtualStorage]struct{} // set of virtual storages node belongs to
- token string // auth token
- err error // any error during dial/ping
+type dialNodesSubcommand struct {
+ p nodes.Printer
}
-func flattenNodes(conf config.Config) map[string]*nodePing {
- nodeByAddress := map[string]*nodePing{} // key is address
-
- // flatten nodes between virtual storages
- for _, vs := range conf.VirtualStorages {
- vsName := virtualStorage(vs.Name)
- for _, node := range vs.Nodes {
- gsName := gitalyStorage(node.Storage)
-
- n, ok := nodeByAddress[node.Address]
- if !ok {
- n = &nodePing{
- storages: map[gitalyStorage][]virtualStorage{},
- vStorages: map[virtualStorage]struct{}{},
- }
- }
- n.address = node.Address
-
- s := n.storages[gsName]
- n.storages[gsName] = append(s, vsName)
-
- n.vStorages[vsName] = struct{}{}
- n.token = node.Token
- nodeByAddress[node.Address] = n
- }
- }
- return nodeByAddress
-}
-
-func (npr *nodePing) dial() (*grpc.ClientConn, error) {
- return subCmdDial(context.TODO(), npr.address, npr.token, defaultDialTimeout)
-}
-
-func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) {
- hClient := grpc_health_v1.NewHealthClient(cc)
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- resp, err := hClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
- if err != nil {
- return 0, err
- }
-
- return resp.GetStatus(), nil
-}
-
-func (npr *nodePing) isConsistent(cc *grpc.ClientConn) bool {
- praefect := gitalypb.NewServerServiceClient(cc)
-
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- if len(npr.storages) == 0 {
- npr.log("ERROR: current configuration has no storages")
- return false
- }
-
- resp, err := praefect.ServerInfo(ctx, &gitalypb.ServerInfoRequest{})
- if err != nil {
- npr.log("ERROR: failed to receive state from the remote: %v", err)
- return false
- }
-
- if len(resp.StorageStatuses) == 0 {
- npr.log("ERROR: remote has no configured storages")
- return false
- }
-
- storagesSet := make(map[gitalyStorage]bool, len(resp.StorageStatuses))
-
- knownStoragesSet := make(map[gitalyStorage]bool, len(npr.storages))
- for k := range npr.storages {
- knownStoragesSet[k] = true
- }
-
- consistent := true
- for _, status := range resp.StorageStatuses {
- gStorage := gitalyStorage(status.StorageName)
-
- // only proceed if the gitaly storage belongs to a configured
- // virtual storage
- if len(npr.storages[gStorage]) == 0 {
- continue
- }
-
- if storagesSet[gStorage] {
- npr.log("ERROR: remote has duplicated storage: %q", status.StorageName)
- consistent = false
- continue
- }
- storagesSet[gStorage] = true
-
- if status.Readable && status.Writeable {
- npr.log(
- "SUCCESS: confirmed Gitaly storage %q in virtual storages %v is served",
- status.StorageName,
- npr.storages[gStorage],
- )
- delete(knownStoragesSet, gStorage) // storage found
- } else {
- npr.log("ERROR: storage %q is not readable or writable", status.StorageName)
- consistent = false
- }
- }
-
- for storage := range knownStoragesSet {
- npr.log("ERROR: configured storage was not reported by remote: %q", storage)
- consistent = false
- }
-
- return consistent
-}
-
-func (npr *nodePing) log(msg string, args ...interface{}) {
- log.Printf("[%s]: %s", npr.address, fmt.Sprintf(msg, args...))
+func (s *dialNodesSubcommand) FlagSet() *flag.FlagSet {
+ return flag.NewFlagSet(dialNodesCmdName, flag.ExitOnError)
}
-func (npr *nodePing) checkNode() {
- npr.log("dialing...")
- cc, err := npr.dial()
- if err != nil {
- npr.log("ERROR: dialing failed: %v", err)
- npr.err = err
- return
- }
- defer cc.Close()
- npr.log("dialed successfully!")
-
- npr.log("checking health...")
- health, err := npr.healthCheck(cc)
- if err != nil {
- npr.log("ERROR: unable to request health check: %v", err)
- npr.err = err
- return
- }
-
- if health != grpc_health_v1.HealthCheckResponse_SERVING {
- npr.err = fmt.Errorf(
- "health check did not report serving, instead reported: %s",
- health.String())
- npr.log("ERROR: %v", npr.err)
- return
- }
-
- npr.log("SUCCESS: node is healthy!")
-
- npr.log("checking consistency...")
- if !npr.isConsistent(cc) {
- npr.err = errors.New("consistency check failed")
- npr.log("ERROR: %v", npr.err)
- return
- }
- npr.log("SUCCESS: node configuration is consistent!")
+func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) error {
+ ctx := context.Background()
+ return nodes.PingAll(ctx, conf, s.p)
}
diff --git a/cmd/praefect/subcmd_dial_nodes_test.go b/cmd/praefect/subcmd_dial_nodes_test.go
index df013916f..9714b031e 100644
--- a/cmd/praefect/subcmd_dial_nodes_test.go
+++ b/cmd/praefect/subcmd_dial_nodes_test.go
@@ -4,13 +4,12 @@ import (
"bytes"
"context"
"fmt"
- "io"
- "log"
"strings"
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
)
@@ -106,72 +105,12 @@ func TestSubCmdDialNodes(t *testing.T) {
tt.conf.SocketPath = ln.Addr().String()
output := &bytes.Buffer{}
- defer func(w io.Writer, f int) {
- log.SetOutput(w)
- log.SetFlags(f) // reinstate timestamp
- }(log.Writer(), log.Flags())
- log.SetOutput(output)
- log.SetFlags(0) // remove timestamp to make output deterministic
+ p := nodes.NewTextPrinter(output)
- cmd := dialNodesSubcommand{}
+ cmd := newDialNodesSubcommand(p)
require.NoError(t, cmd.Exec(nil, tt.conf))
require.Equal(t, tt.logs, output.String())
})
}
}
-
-func TestFlattenNodes(t *testing.T) {
- for _, tt := range []struct {
- desc string
- conf config.Config
- expect map[string]*nodePing
- }{
- {
- desc: "Flatten common address between storages",
- conf: config.Config{
- VirtualStorages: []*config.VirtualStorage{
- {
- Name: "meow",
- Nodes: []*config.Node{
- {
- Storage: "foo",
- Address: "tcp://example.com",
- Token: "abc",
- },
- },
- },
- {
- Name: "woof",
- Nodes: []*config.Node{
- {
- Storage: "bar",
- Address: "tcp://example.com",
- Token: "abc",
- },
- },
- },
- },
- },
- expect: map[string]*nodePing{
- "tcp://example.com": {
- address: "tcp://example.com",
- storages: map[gitalyStorage][]virtualStorage{
- "foo": {"meow"},
- "bar": {"woof"},
- },
- vStorages: map[virtualStorage]struct{}{
- "meow": {},
- "woof": {},
- },
- token: "abc",
- },
- },
- },
- } {
- t.Run(tt.desc, func(t *testing.T) {
- actual := flattenNodes(tt.conf)
- require.Equal(t, tt.expect, actual)
- })
- }
-}
diff --git a/internal/praefect/nodes/ping.go b/internal/praefect/nodes/ping.go
new file mode 100644
index 000000000..4b8847be2
--- /dev/null
+++ b/internal/praefect/nodes/ping.go
@@ -0,0 +1,261 @@
+package nodes
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "strings"
+ "sync"
+
+ gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth"
+ "gitlab.com/gitlab-org/gitaly/v14/client"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+type (
+ virtualStorage string
+ gitalyStorage string
+)
+
+func newPingSet(conf config.Config, printer Printer) map[string]*Ping {
+ nodeByAddress := map[string]*Ping{} // key is address
+
+ // flatten nodes between virtual storages
+ for _, vs := range conf.VirtualStorages {
+ vsName := virtualStorage(vs.Name)
+ for _, node := range vs.Nodes {
+ gsName := gitalyStorage(node.Storage)
+
+ n, ok := nodeByAddress[node.Address]
+ if !ok {
+ n = &Ping{
+ storages: map[gitalyStorage][]virtualStorage{},
+ vStorages: map[virtualStorage]struct{}{},
+ printer: printer,
+ }
+ }
+ n.address = node.Address
+
+ s := n.storages[gsName]
+ n.storages[gsName] = append(s, vsName)
+
+ n.vStorages[vsName] = struct{}{}
+ n.token = node.Token
+ nodeByAddress[node.Address] = n
+ }
+ }
+ return nodeByAddress
+}
+
+// Ping is used to determine node health for a gitaly node
+type Ping struct {
+ address string
+ // set of storages this node hosts
+ storages map[gitalyStorage][]virtualStorage
+ vStorages map[virtualStorage]struct{} // set of virtual storages node belongs to
+ token string // auth token
+ err error // any error during dial/ping
+ printer Printer
+}
+
+// Address returns the address of the node
+func (p *Ping) Address() string {
+ return p.address
+}
+
+func (p *Ping) dial(ctx context.Context) (*grpc.ClientConn, error) {
+ opts := []grpc.DialOption{
+ grpc.WithBlock(),
+ }
+
+ if len(p.token) > 0 {
+ opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.token)))
+ }
+
+ return client.DialContext(ctx, p.address, opts)
+}
+
+func (p *Ping) healthCheck(ctx context.Context, cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) {
+ hClient := grpc_health_v1.NewHealthClient(cc)
+
+ resp, err := hClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
+ if err != nil {
+ return 0, err
+ }
+
+ return resp.GetStatus(), nil
+}
+
+func (p *Ping) isConsistent(ctx context.Context, cc *grpc.ClientConn) bool {
+ praefect := gitalypb.NewServerServiceClient(cc)
+
+ if len(p.storages) == 0 {
+ p.log("ERROR: current configuration has no storages")
+ return false
+ }
+
+ resp, err := praefect.ServerInfo(ctx, &gitalypb.ServerInfoRequest{})
+ if err != nil {
+ p.log("ERROR: failed to receive state from the remote: %v", err)
+ return false
+ }
+
+ if len(resp.StorageStatuses) == 0 {
+ p.log("ERROR: remote has no configured storages")
+ return false
+ }
+
+ storagesSet := make(map[gitalyStorage]bool, len(resp.StorageStatuses))
+
+ knownStoragesSet := make(map[gitalyStorage]bool, len(p.storages))
+ for k := range p.storages {
+ knownStoragesSet[k] = true
+ }
+
+ consistent := true
+ for _, status := range resp.StorageStatuses {
+ gStorage := gitalyStorage(status.StorageName)
+
+ // only proceed if the gitaly storage belongs to a configured
+ // virtual storage
+ if len(p.storages[gStorage]) == 0 {
+ continue
+ }
+
+ if storagesSet[gStorage] {
+ p.log("ERROR: remote has duplicated storage: %q", status.StorageName)
+ consistent = false
+ continue
+ }
+ storagesSet[gStorage] = true
+
+ if status.Readable && status.Writeable {
+ p.log(
+ "SUCCESS: confirmed Gitaly storage %q in virtual storages %v is served",
+ status.StorageName,
+ p.storages[gStorage],
+ )
+ delete(knownStoragesSet, gStorage) // storage found
+ } else {
+ p.log("ERROR: storage %q is not readable or writable", status.StorageName)
+ consistent = false
+ }
+ }
+
+ for storage := range knownStoragesSet {
+ p.log("ERROR: configured storage was not reported by remote: %q", storage)
+ consistent = false
+ }
+
+ return consistent
+}
+
+func (p *Ping) log(msg string, args ...interface{}) {
+ p.printer.Printf("[%s]: %s", p.address, fmt.Sprintf(msg, args...))
+}
+
+// Printer is an interface for Ping to print messages
+type Printer interface {
+ Printf(format string, args ...interface{})
+}
+
+// TextPrinter is a basic printer that writes to a writer
+type TextPrinter struct {
+ w io.Writer
+}
+
+// NewTextPrinter creates a new TextPrinter instance
+func NewTextPrinter(w io.Writer) *TextPrinter {
+ return &TextPrinter{w}
+}
+
+// Printf prints the message and adds a newline
+func (t *TextPrinter) Printf(format string, args ...interface{}) {
+ fmt.Fprintf(t.w, format, args...)
+ fmt.Fprint(t.w, "\n")
+}
+
+// CheckNode checks network connectivity by issuing a healthcheck request, and
+// also calls the ServerInfo RPC to check disk read/write access.
+func (p *Ping) CheckNode(ctx context.Context) {
+ p.log("dialing...")
+ cc, err := p.dial(ctx)
+ if err != nil {
+ p.log("ERROR: dialing failed: %v", err)
+ p.err = err
+ return
+ }
+ defer cc.Close()
+ p.log("dialed successfully!")
+
+ p.log("checking health...")
+ health, err := p.healthCheck(ctx, cc)
+ if err != nil {
+ p.log("ERROR: unable to request health check: %v", err)
+ p.err = err
+ return
+ }
+
+ if health != grpc_health_v1.HealthCheckResponse_SERVING {
+ p.err = fmt.Errorf(
+ "health check did not report serving, instead reported: %s",
+ health.String())
+ p.log("ERROR: %v", p.err)
+ return
+ }
+
+ p.log("SUCCESS: node is healthy!")
+
+ p.log("checking consistency...")
+ if !p.isConsistent(ctx, cc) {
+ p.err = errors.New("consistency check failed")
+ p.log("ERROR: %v", p.err)
+ return
+ }
+ p.log("SUCCESS: node configuration is consistent!")
+}
+
+func (p *Ping) Error() error {
+ return p.err
+}
+
+// PingAll loops through all the pings and calls CheckNode on them
+func PingAll(ctx context.Context, cfg config.Config, printer Printer) error {
+ pings := newPingSet(cfg, printer)
+
+ var wg sync.WaitGroup
+ for _, n := range pings {
+ wg.Add(1)
+ go func(n *Ping) {
+ defer wg.Done()
+ n.CheckNode(ctx)
+ }(n)
+ }
+ wg.Wait()
+
+ var unhealthyAddresses []string
+ for _, n := range pings {
+ if n.Error() != nil {
+ unhealthyAddresses = append(unhealthyAddresses, n.address)
+ }
+ }
+
+ if len(unhealthyAddresses) > 0 {
+ return &pingError{unhealthyAddresses}
+ }
+
+ return nil
+}
+
+type pingError struct {
+ unhealthyAddresses []string
+}
+
+// Error returns a composite error message based on which nodes were deemed unhealthy
+func (n *pingError) Error() string {
+ return fmt.Sprintf("the following nodes are not healthy: %s", strings.Join(n.unhealthyAddresses, ", "))
+}
diff --git a/internal/praefect/nodes/ping_test.go b/internal/praefect/nodes/ping_test.go
new file mode 100644
index 000000000..7457c1d5b
--- /dev/null
+++ b/internal/praefect/nodes/ping_test.go
@@ -0,0 +1,53 @@
+package nodes
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
+)
+
+func TestNewPingSet(t *testing.T) {
+ conf := config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "meow",
+ Nodes: []*config.Node{
+ {
+ Storage: "foo",
+ Address: "tcp://example.com",
+ Token: "abc",
+ },
+ },
+ },
+ {
+ Name: "woof",
+ Nodes: []*config.Node{
+ {
+ Storage: "bar",
+ Address: "tcp://example.com",
+ Token: "abc",
+ },
+ },
+ },
+ },
+ }
+
+ actual := newPingSet(conf, nil)
+ expected := map[string]*Ping{
+ "tcp://example.com": {
+ address: "tcp://example.com",
+ storages: map[gitalyStorage][]virtualStorage{
+ "foo": {"meow"},
+ "bar": {"woof"},
+ },
+ vStorages: map[virtualStorage]struct{}{
+ "meow": {},
+ "woof": {},
+ },
+ token: "abc",
+ },
+ }
+
+ require.Equal(t, expected, actual)
+}