diff options
author | Sami Hiltunen <shiltunen@gitlab.com> | 2020-06-08 16:09:31 +0300 |
---|---|---|
committer | Sami Hiltunen <shiltunen@gitlab.com> | 2020-06-08 16:09:31 +0300 |
commit | 9d3c543fc1aca12bc59bac2190d305a80c67ff7a (patch) | |
tree | 22629a86463398cbbfa9ad7b04f365c18433fbca | |
parent | 7d374d517487d45ab12329fed1b19a9c13988bbe (diff) | |
parent | c837578af24c295a35ae7367d66ca8057db152db (diff) |
Merge branch 'po-praefect-nodeping' into 'master'
Only log relevant storages in Praefect dial-nodes
Closes #2633
See merge request gitlab-org/gitaly!2229
-rw-r--r-- | changelogs/unreleased/po-praefect-nodeping.yml | 5 | ||||
-rw-r--r-- | cmd/praefect/main_test.go | 8 | ||||
-rw-r--r-- | cmd/praefect/subcmd_dataloss_test.go | 34 | ||||
-rw-r--r-- | cmd/praefect/subcmd_enable_writes_test.go | 2 | ||||
-rw-r--r-- | cmd/praefect/subcmd_pingnodes.go | 56 | ||||
-rw-r--r-- | cmd/praefect/subcmd_pingnodes_test.go | 124 | ||||
-rw-r--r-- | cmd/praefect/subcmd_test.go | 68 |
7 files changed, 243 insertions, 54 deletions
diff --git a/changelogs/unreleased/po-praefect-nodeping.yml b/changelogs/unreleased/po-praefect-nodeping.yml new file mode 100644 index 000000000..a9f500ba7 --- /dev/null +++ b/changelogs/unreleased/po-praefect-nodeping.yml @@ -0,0 +1,5 @@ +--- +title: Only log relevant storages in Praefect dial-nodes +merge_request: 2229 +author: +type: added diff --git a/cmd/praefect/main_test.go b/cmd/praefect/main_test.go index 7ca079675..ad9c67927 100644 --- a/cmd/praefect/main_test.go +++ b/cmd/praefect/main_test.go @@ -51,11 +51,11 @@ func TestFlattenNodes(t *testing.T) { expect: map[string]*nodePing{ "tcp://example.com": &nodePing{ address: "tcp://example.com", - storages: map[string]struct{}{ - "foo": struct{}{}, - "bar": struct{}{}, + storages: map[gitalyStorage][]virtualStorage{ + "foo": []virtualStorage{"meow"}, + "bar": []virtualStorage{"woof"}, }, - vStorages: map[string]struct{}{ + vStorages: map[virtualStorage]struct{}{ "meow": struct{}{}, "woof": struct{}{}, }, diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go index 8027685c8..166064b2e 100644 --- a/cmd/praefect/subcmd_dataloss_test.go +++ b/cmd/praefect/subcmd_dataloss_test.go @@ -4,18 +4,13 @@ import ( "bytes" "context" "flag" - "fmt" - "net" - "path/filepath" "testing" "time" "github.com/golang/protobuf/ptypes/timestamp" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" - "gitlab.com/gitlab-org/gitaly/internal/testhelper" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc" ) func TestTimeFlag(t *testing.T) { @@ -64,36 +59,9 @@ func (m mockPraefectInfoService) EnableWrites(ctx context.Context, r *gitalypb.E return m.EnableWritesFunc(ctx, r) } -func StartPraefectInfoService(t testing.TB, impl gitalypb.PraefectInfoServiceServer) (net.Listener, func()) { - t.Helper() - - tmp, clean := testhelper.TempDir(t) - - ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock")) - require.NoError(t, err) - - srv := grpc.NewServer() - gitalypb.RegisterPraefectInfoServiceServer(srv, impl) - go func() { require.NoError(t, srv.Serve(ln)) }() - - ctx, cancel := testhelper.Context() - defer cancel() - - // verify the service is up - addr := fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr()) - cc, err := grpc.DialContext(ctx, addr, grpc.WithBlock(), grpc.WithInsecure()) - require.NoError(t, err) - require.NoError(t, cc.Close()) - - return ln, func() { - srv.Stop() - clean() - } -} - func TestDatalossSubcommand(t *testing.T) { mockSvc := &mockPraefectInfoService{} - ln, clean := StartPraefectInfoService(t, mockSvc) + ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(mockSvc)}) defer clean() for _, tc := range []struct { desc string diff --git a/cmd/praefect/subcmd_enable_writes_test.go b/cmd/praefect/subcmd_enable_writes_test.go index a1e9760cf..179a980f1 100644 --- a/cmd/praefect/subcmd_enable_writes_test.go +++ b/cmd/praefect/subcmd_enable_writes_test.go @@ -12,7 +12,7 @@ import ( func TestEnableWritesSubcommand(t *testing.T) { mockSvc := &mockPraefectInfoService{} - ln, clean := StartPraefectInfoService(t, mockSvc) + ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(mockSvc)}) defer clean() type EnableWritesFunc func(context.Context, *gitalypb.EnableWritesRequest) (*gitalypb.EnableWritesResponse, error) diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go index 742f8eb12..cc2f87ff6 100644 --- a/cmd/praefect/subcmd_pingnodes.go +++ b/cmd/praefect/subcmd_pingnodes.go @@ -15,12 +15,18 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" ) +type ( + virtualStorage string + gitalyStorage string +) + type nodePing struct { - address string - storages map[string]struct{} // set of storages this node hosts - vStorages map[string]struct{} // set of virtual storages node belongs to - token string // auth token - err error // any error during dial/ping + address string + // set of storages this node hosts + storages map[gitalyStorage][]virtualStorage + vStorages map[virtualStorage]struct{} // set of virtual storages node belongs to + token string // auth token + err error // any error during dial/ping } func flattenNodes(conf config.Config) map[string]*nodePing { @@ -28,17 +34,23 @@ func flattenNodes(conf config.Config) map[string]*nodePing { // flatten nodes between virtual storages for _, vs := range conf.VirtualStorages { + vsName := virtualStorage(vs.Name) for _, node := range vs.Nodes { + gsName := gitalyStorage(node.Storage) + n, ok := nodeByAddress[node.Address] if !ok { n = &nodePing{ - storages: map[string]struct{}{}, - vStorages: map[string]struct{}{}, + storages: map[gitalyStorage][]virtualStorage{}, + vStorages: map[virtualStorage]struct{}{}, } } n.address = node.Address - n.storages[node.Storage] = struct{}{} - n.vStorages[vs.Name] = struct{}{} + + s := n.storages[gsName] + n.storages[gsName] = append(s, vsName) + + n.vStorages[vsName] = struct{}{} n.token = node.Token nodeByAddress[node.Address] = n } @@ -115,25 +127,37 @@ func (npr *nodePing) isConsistent(cc *grpc.ClientConn) bool { return false } - storagesSet := make(map[string]bool, len(resp.StorageStatuses)) + storagesSet := make(map[gitalyStorage]bool, len(resp.StorageStatuses)) - knownStoragesSet := make(map[string]bool, len(npr.storages)) + knownStoragesSet := make(map[gitalyStorage]bool, len(npr.storages)) for k := range npr.storages { knownStoragesSet[k] = true } consistent := true for _, status := range resp.StorageStatuses { - if storagesSet[status.StorageName] { + gStorage := gitalyStorage(status.StorageName) + + // only proceed if the gitaly storage belongs to a configured + // virtual storage + if len(npr.storages[gStorage]) == 0 { + continue + } + + if storagesSet[gStorage] { npr.log("ERROR: remote has duplicated storage: %q", status.StorageName) consistent = false continue } - storagesSet[status.StorageName] = true + storagesSet[gStorage] = true if status.Readable && status.Writeable { - npr.log("SUCCESS: %q is served by Gitaly", status.StorageName) - delete(knownStoragesSet, status.StorageName) // storage found + npr.log( + "SUCCESS: confirmed Gitaly storage %q in virtual storages %v is served", + status.StorageName, + npr.storages[gStorage], + ) + delete(knownStoragesSet, gStorage) // storage found } else { npr.log("ERROR: storage %q is not readable or writable", status.StorageName) consistent = false @@ -187,5 +211,5 @@ func (npr *nodePing) checkNode() { npr.log("ERROR: %v", npr.err) return } - npr.log("SUCCESS: node is consistent!") + npr.log("SUCCESS: node configuration is consistent!") } diff --git a/cmd/praefect/subcmd_pingnodes_test.go b/cmd/praefect/subcmd_pingnodes_test.go new file mode 100644 index 000000000..495161211 --- /dev/null +++ b/cmd/praefect/subcmd_pingnodes_test.go @@ -0,0 +1,124 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "log" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/internal/praefect/models" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" +) + +type mockServerService struct { + gitalypb.UnimplementedServerServiceServer + serverInfoFunc func(ctx context.Context, r *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) +} + +func (m mockServerService) ServerInfo(ctx context.Context, r *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) { + return m.serverInfoFunc(ctx, r) +} + +func TestSubCmdDialNodes(t *testing.T) { + var resp *gitalypb.ServerInfoResponse + mockSvc := &mockServerService{ + serverInfoFunc: func(_ context.Context, _ *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) { + return resp, nil + }, + } + ln, clean := listenAndServe(t, + []svcRegistrar{ + registerHealthService, + registerServerService(mockSvc), + }, + ) + defer clean() + + decorateLogs := func(s []string) []string { + for i, ss := range s { + s[i] = fmt.Sprintf("[unix:/%s]: %s\n", ln.Addr(), ss) + } + return s + } + + for _, tt := range []struct { + name string + conf config.Config + resp *gitalypb.ServerInfoResponse + logs string + }{ + { + name: "2 virtuals, 2 storages, 1 node", + conf: config.Config{ + VirtualStorages: []*config.VirtualStorage{ + { + Name: "default", + Nodes: []*models.Node{ + { + Storage: "1", + Address: "unix:/" + ln.Addr().String(), + }, + }, + }, + { + Name: "storage-1", + Nodes: []*models.Node{ + { + Storage: "2", + Address: "unix:/" + ln.Addr().String(), + }, + }, + }, + }, + }, + resp: &gitalypb.ServerInfoResponse{ + StorageStatuses: []*gitalypb.ServerInfoResponse_StorageStatus{ + { + StorageName: "1", + Readable: true, + Writeable: true, + }, + { + StorageName: "2", + Readable: true, + Writeable: true, + }, + }, + }, + logs: strings.Join(decorateLogs([]string{ + "dialing...", + "dialed successfully!", + "checking health...", + "SUCCESS: node is healthy!", + "checking consistency...", + "SUCCESS: confirmed Gitaly storage \"1\" in virtual storages [default] is served", + "SUCCESS: confirmed Gitaly storage \"2\" in virtual storages [storage-1] is served", + "SUCCESS: node configuration is consistent!", + }), ""), + }, + } { + t.Run(tt.name, func(t *testing.T) { + resp = tt.resp + tt.conf.SocketPath = ln.Addr().String() + log.Print(tt.conf.SocketPath) + + output := &bytes.Buffer{} + defer func(w io.Writer, f int) { + log.SetOutput(w) + log.SetFlags(f) // reinstate timestamp + }(log.Writer(), log.Flags()) + log.SetOutput(output) + log.SetFlags(0) // remove timestamp to make output deterministic + + cmd := dialNodesSubcommand{} + require.NoError(t, cmd.Exec(nil, tt.conf)) + + require.Equal(t, tt.logs, output.String()) + }) + } +} diff --git a/cmd/praefect/subcmd_test.go b/cmd/praefect/subcmd_test.go new file mode 100644 index 000000000..dc3e40758 --- /dev/null +++ b/cmd/praefect/subcmd_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "fmt" + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" +) + +// svcRegistrar is a function that registers a gRPC service with a server +// instance +type svcRegistrar func(*grpc.Server) + +func registerHealthService(srv *grpc.Server) { + healthSrvr := health.NewServer() + grpc_health_v1.RegisterHealthServer(srv, healthSrvr) + healthSrvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) +} + +func registerServerService(impl gitalypb.ServerServiceServer) svcRegistrar { + return func(srv *grpc.Server) { + gitalypb.RegisterServerServiceServer(srv, impl) + } +} + +func registerPraefectInfoServer(impl gitalypb.PraefectInfoServiceServer) svcRegistrar { + return func(srv *grpc.Server) { + gitalypb.RegisterPraefectInfoServiceServer(srv, impl) + } +} + +func listenAndServe(t testing.TB, svcs []svcRegistrar) (net.Listener, testhelper.Cleanup) { + t.Helper() + + tmp, clean := testhelper.TempDir(t) + + ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock")) + require.NoError(t, err) + + srv := grpc.NewServer() + + for _, s := range svcs { + s(srv) + } + + go func() { require.NoError(t, srv.Serve(ln)) }() + + ctx, cancel := testhelper.Context() + defer cancel() + + // verify the service is up + addr := fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr()) + cc, err := grpc.DialContext(ctx, addr, grpc.WithBlock(), grpc.WithInsecure()) + require.NoError(t, err) + require.NoError(t, cc.Close()) + + return ln, func() { + srv.Stop() + clean() + } +} |