diff options
author | John Cai <jcai@gitlab.com> | 2021-11-08 19:17:46 +0300 |
---|---|---|
committer | John Cai <jcai@gitlab.com> | 2021-11-08 19:41:07 +0300 |
commit | 15199ee173407195aec00e47fff7e748dcba8cc0 (patch) | |
tree | cb71efc265c4dc897680a5af29ff6346dfd5834b | |
parent | a49570e725af9ece4f04dcb89760352af9f39b68 (diff) |
praefect: refactor node ping into praefect package
The node ping code can be reused. This commit moves it into the praefect
internal package. This change paves the way for a praefect startup check
to use the logic to check the health of internal gitaly nodes.
-rw-r--r-- | cmd/praefect/subcmd.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dial_nodes.go | 209 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dial_nodes_test.go | 67 | ||||
-rw-r--r-- | internal/praefect/nodes/ping.go | 261 | ||||
-rw-r--r-- | internal/praefect/nodes/ping_test.go | 53 |
5 files changed, 328 insertions, 264 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go index 2f09c7587..fa371cf7c 100644 --- a/cmd/praefect/subcmd.go +++ b/cmd/praefect/subcmd.go @@ -28,7 +28,7 @@ const defaultDialTimeout = 30 * time.Second var subcommands = map[string]subcmd{ sqlPingCmdName: &sqlPingSubcommand{}, sqlMigrateCmdName: &sqlMigrateSubcommand{}, - dialNodesCmdName: &dialNodesSubcommand{}, + dialNodesCmdName: newDialNodesSubcommand(logger), sqlMigrateDownCmdName: &sqlMigrateDownSubcommand{}, sqlMigrateStatusCmdName: &sqlMigrateStatusSubcommand{}, datalossCmdName: newDatalossSubcommand(), diff --git a/cmd/praefect/subcmd_dial_nodes.go b/cmd/praefect/subcmd_dial_nodes.go index 0b22a0555..215a5496e 100644 --- a/cmd/praefect/subcmd_dial_nodes.go +++ b/cmd/praefect/subcmd_dial_nodes.go @@ -2,216 +2,27 @@ package main import ( "context" - "errors" "flag" - "fmt" - "log" - "sync" - "time" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" ) const dialNodesCmdName = "dial-nodes" -type dialNodesSubcommand struct{} - -func (s *dialNodesSubcommand) FlagSet() *flag.FlagSet { - return flag.NewFlagSet(dialNodesCmdName, flag.ExitOnError) -} - -func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) error { - nodes := flattenNodes(conf) - - var wg sync.WaitGroup - for _, n := range nodes { - wg.Add(1) - go func(n *nodePing) { - defer wg.Done() - n.checkNode() - }(n) - } - wg.Wait() - - var err error - for _, n := range nodes { - if n.err != nil { - err = n.err - } - } - - return err +func newDialNodesSubcommand(p nodes.Printer) *dialNodesSubcommand { + return &dialNodesSubcommand{p} } -type ( - virtualStorage string - gitalyStorage string -) - -type nodePing struct { - address string - // set of storages this node hosts - storages map[gitalyStorage][]virtualStorage - vStorages map[virtualStorage]struct{} // set of virtual storages node belongs to - token string // auth token - err error // any error during dial/ping +type dialNodesSubcommand struct { + p nodes.Printer } -func flattenNodes(conf config.Config) map[string]*nodePing { - nodeByAddress := map[string]*nodePing{} // key is address - - // flatten nodes between virtual storages - for _, vs := range conf.VirtualStorages { - vsName := virtualStorage(vs.Name) - for _, node := range vs.Nodes { - gsName := gitalyStorage(node.Storage) - - n, ok := nodeByAddress[node.Address] - if !ok { - n = &nodePing{ - storages: map[gitalyStorage][]virtualStorage{}, - vStorages: map[virtualStorage]struct{}{}, - } - } - n.address = node.Address - - s := n.storages[gsName] - n.storages[gsName] = append(s, vsName) - - n.vStorages[vsName] = struct{}{} - n.token = node.Token - nodeByAddress[node.Address] = n - } - } - return nodeByAddress -} - -func (npr *nodePing) dial() (*grpc.ClientConn, error) { - return subCmdDial(context.TODO(), npr.address, npr.token, defaultDialTimeout) -} - -func (npr *nodePing) healthCheck(cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) { - hClient := grpc_health_v1.NewHealthClient(cc) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - resp, err := hClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) - if err != nil { - return 0, err - } - - return resp.GetStatus(), nil -} - -func (npr *nodePing) isConsistent(cc *grpc.ClientConn) bool { - praefect := gitalypb.NewServerServiceClient(cc) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - if len(npr.storages) == 0 { - npr.log("ERROR: current configuration has no storages") - return false - } - - resp, err := praefect.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) - if err != nil { - npr.log("ERROR: failed to receive state from the remote: %v", err) - return false - } - - if len(resp.StorageStatuses) == 0 { - npr.log("ERROR: remote has no configured storages") - return false - } - - storagesSet := make(map[gitalyStorage]bool, len(resp.StorageStatuses)) - - knownStoragesSet := make(map[gitalyStorage]bool, len(npr.storages)) - for k := range npr.storages { - knownStoragesSet[k] = true - } - - consistent := true - for _, status := range resp.StorageStatuses { - gStorage := gitalyStorage(status.StorageName) - - // only proceed if the gitaly storage belongs to a configured - // virtual storage - if len(npr.storages[gStorage]) == 0 { - continue - } - - if storagesSet[gStorage] { - npr.log("ERROR: remote has duplicated storage: %q", status.StorageName) - consistent = false - continue - } - storagesSet[gStorage] = true - - if status.Readable && status.Writeable { - npr.log( - "SUCCESS: confirmed Gitaly storage %q in virtual storages %v is served", - status.StorageName, - npr.storages[gStorage], - ) - delete(knownStoragesSet, gStorage) // storage found - } else { - npr.log("ERROR: storage %q is not readable or writable", status.StorageName) - consistent = false - } - } - - for storage := range knownStoragesSet { - npr.log("ERROR: configured storage was not reported by remote: %q", storage) - consistent = false - } - - return consistent -} - -func (npr *nodePing) log(msg string, args ...interface{}) { - log.Printf("[%s]: %s", npr.address, fmt.Sprintf(msg, args...)) +func (s *dialNodesSubcommand) FlagSet() *flag.FlagSet { + return flag.NewFlagSet(dialNodesCmdName, flag.ExitOnError) } -func (npr *nodePing) checkNode() { - npr.log("dialing...") - cc, err := npr.dial() - if err != nil { - npr.log("ERROR: dialing failed: %v", err) - npr.err = err - return - } - defer cc.Close() - npr.log("dialed successfully!") - - npr.log("checking health...") - health, err := npr.healthCheck(cc) - if err != nil { - npr.log("ERROR: unable to request health check: %v", err) - npr.err = err - return - } - - if health != grpc_health_v1.HealthCheckResponse_SERVING { - npr.err = fmt.Errorf( - "health check did not report serving, instead reported: %s", - health.String()) - npr.log("ERROR: %v", npr.err) - return - } - - npr.log("SUCCESS: node is healthy!") - - npr.log("checking consistency...") - if !npr.isConsistent(cc) { - npr.err = errors.New("consistency check failed") - npr.log("ERROR: %v", npr.err) - return - } - npr.log("SUCCESS: node configuration is consistent!") +func (s *dialNodesSubcommand) Exec(flags *flag.FlagSet, conf config.Config) error { + ctx := context.Background() + return nodes.PingAll(ctx, conf, s.p) } diff --git a/cmd/praefect/subcmd_dial_nodes_test.go b/cmd/praefect/subcmd_dial_nodes_test.go index df013916f..9714b031e 100644 --- a/cmd/praefect/subcmd_dial_nodes_test.go +++ b/cmd/praefect/subcmd_dial_nodes_test.go @@ -4,13 +4,12 @@ import ( "bytes" "context" "fmt" - "io" - "log" "strings" "testing" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" ) @@ -106,72 +105,12 @@ func TestSubCmdDialNodes(t *testing.T) { tt.conf.SocketPath = ln.Addr().String() output := &bytes.Buffer{} - defer func(w io.Writer, f int) { - log.SetOutput(w) - log.SetFlags(f) // reinstate timestamp - }(log.Writer(), log.Flags()) - log.SetOutput(output) - log.SetFlags(0) // remove timestamp to make output deterministic + p := nodes.NewTextPrinter(output) - cmd := dialNodesSubcommand{} + cmd := newDialNodesSubcommand(p) require.NoError(t, cmd.Exec(nil, tt.conf)) require.Equal(t, tt.logs, output.String()) }) } } - -func TestFlattenNodes(t *testing.T) { - for _, tt := range []struct { - desc string - conf config.Config - expect map[string]*nodePing - }{ - { - desc: "Flatten common address between storages", - conf: config.Config{ - VirtualStorages: []*config.VirtualStorage{ - { - Name: "meow", - Nodes: []*config.Node{ - { - Storage: "foo", - Address: "tcp://example.com", - Token: "abc", - }, - }, - }, - { - Name: "woof", - Nodes: []*config.Node{ - { - Storage: "bar", - Address: "tcp://example.com", - Token: "abc", - }, - }, - }, - }, - }, - expect: map[string]*nodePing{ - "tcp://example.com": { - address: "tcp://example.com", - storages: map[gitalyStorage][]virtualStorage{ - "foo": {"meow"}, - "bar": {"woof"}, - }, - vStorages: map[virtualStorage]struct{}{ - "meow": {}, - "woof": {}, - }, - token: "abc", - }, - }, - }, - } { - t.Run(tt.desc, func(t *testing.T) { - actual := flattenNodes(tt.conf) - require.Equal(t, tt.expect, actual) - }) - } -} diff --git a/internal/praefect/nodes/ping.go b/internal/praefect/nodes/ping.go new file mode 100644 index 000000000..4b8847be2 --- /dev/null +++ b/internal/praefect/nodes/ping.go @@ -0,0 +1,261 @@ +package nodes + +import ( + "context" + "errors" + "fmt" + "io" + "strings" + "sync" + + gitalyauth "gitlab.com/gitlab-org/gitaly/v14/auth" + "gitlab.com/gitlab-org/gitaly/v14/client" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" +) + +type ( + virtualStorage string + gitalyStorage string +) + +func newPingSet(conf config.Config, printer Printer) map[string]*Ping { + nodeByAddress := map[string]*Ping{} // key is address + + // flatten nodes between virtual storages + for _, vs := range conf.VirtualStorages { + vsName := virtualStorage(vs.Name) + for _, node := range vs.Nodes { + gsName := gitalyStorage(node.Storage) + + n, ok := nodeByAddress[node.Address] + if !ok { + n = &Ping{ + storages: map[gitalyStorage][]virtualStorage{}, + vStorages: map[virtualStorage]struct{}{}, + printer: printer, + } + } + n.address = node.Address + + s := n.storages[gsName] + n.storages[gsName] = append(s, vsName) + + n.vStorages[vsName] = struct{}{} + n.token = node.Token + nodeByAddress[node.Address] = n + } + } + return nodeByAddress +} + +// Ping is used to determine node health for a gitaly node +type Ping struct { + address string + // set of storages this node hosts + storages map[gitalyStorage][]virtualStorage + vStorages map[virtualStorage]struct{} // set of virtual storages node belongs to + token string // auth token + err error // any error during dial/ping + printer Printer +} + +// Address returns the address of the node +func (p *Ping) Address() string { + return p.address +} + +func (p *Ping) dial(ctx context.Context) (*grpc.ClientConn, error) { + opts := []grpc.DialOption{ + grpc.WithBlock(), + } + + if len(p.token) > 0 { + opts = append(opts, grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(p.token))) + } + + return client.DialContext(ctx, p.address, opts) +} + +func (p *Ping) healthCheck(ctx context.Context, cc *grpc.ClientConn) (grpc_health_v1.HealthCheckResponse_ServingStatus, error) { + hClient := grpc_health_v1.NewHealthClient(cc) + + resp, err := hClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) + if err != nil { + return 0, err + } + + return resp.GetStatus(), nil +} + +func (p *Ping) isConsistent(ctx context.Context, cc *grpc.ClientConn) bool { + praefect := gitalypb.NewServerServiceClient(cc) + + if len(p.storages) == 0 { + p.log("ERROR: current configuration has no storages") + return false + } + + resp, err := praefect.ServerInfo(ctx, &gitalypb.ServerInfoRequest{}) + if err != nil { + p.log("ERROR: failed to receive state from the remote: %v", err) + return false + } + + if len(resp.StorageStatuses) == 0 { + p.log("ERROR: remote has no configured storages") + return false + } + + storagesSet := make(map[gitalyStorage]bool, len(resp.StorageStatuses)) + + knownStoragesSet := make(map[gitalyStorage]bool, len(p.storages)) + for k := range p.storages { + knownStoragesSet[k] = true + } + + consistent := true + for _, status := range resp.StorageStatuses { + gStorage := gitalyStorage(status.StorageName) + + // only proceed if the gitaly storage belongs to a configured + // virtual storage + if len(p.storages[gStorage]) == 0 { + continue + } + + if storagesSet[gStorage] { + p.log("ERROR: remote has duplicated storage: %q", status.StorageName) + consistent = false + continue + } + storagesSet[gStorage] = true + + if status.Readable && status.Writeable { + p.log( + "SUCCESS: confirmed Gitaly storage %q in virtual storages %v is served", + status.StorageName, + p.storages[gStorage], + ) + delete(knownStoragesSet, gStorage) // storage found + } else { + p.log("ERROR: storage %q is not readable or writable", status.StorageName) + consistent = false + } + } + + for storage := range knownStoragesSet { + p.log("ERROR: configured storage was not reported by remote: %q", storage) + consistent = false + } + + return consistent +} + +func (p *Ping) log(msg string, args ...interface{}) { + p.printer.Printf("[%s]: %s", p.address, fmt.Sprintf(msg, args...)) +} + +// Printer is an interface for Ping to print messages +type Printer interface { + Printf(format string, args ...interface{}) +} + +// TextPrinter is a basic printer that writes to a writer +type TextPrinter struct { + w io.Writer +} + +// NewTextPrinter creates a new TextPrinter instance +func NewTextPrinter(w io.Writer) *TextPrinter { + return &TextPrinter{w} +} + +// Printf prints the message and adds a newline +func (t *TextPrinter) Printf(format string, args ...interface{}) { + fmt.Fprintf(t.w, format, args...) + fmt.Fprint(t.w, "\n") +} + +// CheckNode checks network connectivity by issuing a healthcheck request, and +// also calls the ServerInfo RPC to check disk read/write access. +func (p *Ping) CheckNode(ctx context.Context) { + p.log("dialing...") + cc, err := p.dial(ctx) + if err != nil { + p.log("ERROR: dialing failed: %v", err) + p.err = err + return + } + defer cc.Close() + p.log("dialed successfully!") + + p.log("checking health...") + health, err := p.healthCheck(ctx, cc) + if err != nil { + p.log("ERROR: unable to request health check: %v", err) + p.err = err + return + } + + if health != grpc_health_v1.HealthCheckResponse_SERVING { + p.err = fmt.Errorf( + "health check did not report serving, instead reported: %s", + health.String()) + p.log("ERROR: %v", p.err) + return + } + + p.log("SUCCESS: node is healthy!") + + p.log("checking consistency...") + if !p.isConsistent(ctx, cc) { + p.err = errors.New("consistency check failed") + p.log("ERROR: %v", p.err) + return + } + p.log("SUCCESS: node configuration is consistent!") +} + +func (p *Ping) Error() error { + return p.err +} + +// PingAll loops through all the pings and calls CheckNode on them +func PingAll(ctx context.Context, cfg config.Config, printer Printer) error { + pings := newPingSet(cfg, printer) + + var wg sync.WaitGroup + for _, n := range pings { + wg.Add(1) + go func(n *Ping) { + defer wg.Done() + n.CheckNode(ctx) + }(n) + } + wg.Wait() + + var unhealthyAddresses []string + for _, n := range pings { + if n.Error() != nil { + unhealthyAddresses = append(unhealthyAddresses, n.address) + } + } + + if len(unhealthyAddresses) > 0 { + return &pingError{unhealthyAddresses} + } + + return nil +} + +type pingError struct { + unhealthyAddresses []string +} + +// Error returns a composite error message based on which nodes were deemed unhealthy +func (n *pingError) Error() string { + return fmt.Sprintf("the following nodes are not healthy: %s", strings.Join(n.unhealthyAddresses, ", ")) +} diff --git a/internal/praefect/nodes/ping_test.go b/internal/praefect/nodes/ping_test.go new file mode 100644 index 000000000..7457c1d5b --- /dev/null +++ b/internal/praefect/nodes/ping_test.go @@ -0,0 +1,53 @@ +package nodes + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" +) + +func TestNewPingSet(t *testing.T) { + conf := config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "meow", + Nodes: []*config.Node{ + { + Storage: "foo", + Address: "tcp://example.com", + Token: "abc", + }, + }, + }, + { + Name: "woof", + Nodes: []*config.Node{ + { + Storage: "bar", + Address: "tcp://example.com", + Token: "abc", + }, + }, + }, + }, + } + + actual := newPingSet(conf, nil) + expected := map[string]*Ping{ + "tcp://example.com": { + address: "tcp://example.com", + storages: map[gitalyStorage][]virtualStorage{ + "foo": {"meow"}, + "bar": {"woof"}, + }, + vStorages: map[virtualStorage]struct{}{ + "meow": {}, + "woof": {}, + }, + token: "abc", + }, + } + + require.Equal(t, expected, actual) +} |