Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Okstad <pokstad@gitlab.com>2020-06-08 16:09:31 +0300
committerSami Hiltunen <shiltunen@gitlab.com>2020-06-08 16:09:31 +0300
commitc837578af24c295a35ae7367d66ca8057db152db (patch)
tree22629a86463398cbbfa9ad7b04f365c18433fbca
parent7d374d517487d45ab12329fed1b19a9c13988bbe (diff)
Only log relevant storages in Praefect dial-nodes
-rw-r--r--changelogs/unreleased/po-praefect-nodeping.yml5
-rw-r--r--cmd/praefect/main_test.go8
-rw-r--r--cmd/praefect/subcmd_dataloss_test.go34
-rw-r--r--cmd/praefect/subcmd_enable_writes_test.go2
-rw-r--r--cmd/praefect/subcmd_pingnodes.go56
-rw-r--r--cmd/praefect/subcmd_pingnodes_test.go124
-rw-r--r--cmd/praefect/subcmd_test.go68
7 files changed, 243 insertions, 54 deletions
diff --git a/changelogs/unreleased/po-praefect-nodeping.yml b/changelogs/unreleased/po-praefect-nodeping.yml
new file mode 100644
index 000000000..a9f500ba7
--- /dev/null
+++ b/changelogs/unreleased/po-praefect-nodeping.yml
@@ -0,0 +1,5 @@
+---
+title: Only log relevant storages in Praefect dial-nodes
+merge_request: 2229
+author:
+type: added
diff --git a/cmd/praefect/main_test.go b/cmd/praefect/main_test.go
index 7ca079675..ad9c67927 100644
--- a/cmd/praefect/main_test.go
+++ b/cmd/praefect/main_test.go
@@ -51,11 +51,11 @@ func TestFlattenNodes(t *testing.T) {
expect: map[string]*nodePing{
"tcp://example.com": &nodePing{
address: "tcp://example.com",
- storages: map[string]struct{}{
- "foo": struct{}{},
- "bar": struct{}{},
+ storages: map[gitalyStorage][]virtualStorage{
+ "foo": []virtualStorage{"meow"},
+ "bar": []virtualStorage{"woof"},
},
- vStorages: map[string]struct{}{
+ vStorages: map[virtualStorage]struct{}{
"meow": struct{}{},
"woof": struct{}{},
},
diff --git a/cmd/praefect/subcmd_dataloss_test.go b/cmd/praefect/subcmd_dataloss_test.go
index 8027685c8..166064b2e 100644
--- a/cmd/praefect/subcmd_dataloss_test.go
+++ b/cmd/praefect/subcmd_dataloss_test.go
@@ -4,18 +4,13 @@ import (
"bytes"
"context"
"flag"
- "fmt"
- "net"
- "path/filepath"
"testing"
"time"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/internal/praefect/config"
- "gitlab.com/gitlab-org/gitaly/internal/testhelper"
"gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
- "google.golang.org/grpc"
)
func TestTimeFlag(t *testing.T) {
@@ -64,36 +59,9 @@ func (m mockPraefectInfoService) EnableWrites(ctx context.Context, r *gitalypb.E
return m.EnableWritesFunc(ctx, r)
}
-func StartPraefectInfoService(t testing.TB, impl gitalypb.PraefectInfoServiceServer) (net.Listener, func()) {
- t.Helper()
-
- tmp, clean := testhelper.TempDir(t)
-
- ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock"))
- require.NoError(t, err)
-
- srv := grpc.NewServer()
- gitalypb.RegisterPraefectInfoServiceServer(srv, impl)
- go func() { require.NoError(t, srv.Serve(ln)) }()
-
- ctx, cancel := testhelper.Context()
- defer cancel()
-
- // verify the service is up
- addr := fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr())
- cc, err := grpc.DialContext(ctx, addr, grpc.WithBlock(), grpc.WithInsecure())
- require.NoError(t, err)
- require.NoError(t, cc.Close())
-
- return ln, func() {
- srv.Stop()
- clean()
- }
-}
-
func TestDatalossSubcommand(t *testing.T) {
mockSvc := &mockPraefectInfoService{}
- ln, clean := StartPraefectInfoService(t, mockSvc)
+ ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(mockSvc)})
defer clean()
for _, tc := range []struct {
desc string
diff --git a/cmd/praefect/subcmd_enable_writes_test.go b/cmd/praefect/subcmd_enable_writes_test.go
index a1e9760cf..179a980f1 100644
--- a/cmd/praefect/subcmd_enable_writes_test.go
+++ b/cmd/praefect/subcmd_enable_writes_test.go
@@ -12,7 +12,7 @@ import (
func TestEnableWritesSubcommand(t *testing.T) {
mockSvc := &mockPraefectInfoService{}
- ln, clean := StartPraefectInfoService(t, mockSvc)
+ ln, clean := listenAndServe(t, []svcRegistrar{registerPraefectInfoServer(mockSvc)})
defer clean()
type EnableWritesFunc func(context.Context, *gitalypb.EnableWritesRequest) (*gitalypb.EnableWritesResponse, error)
diff --git a/cmd/praefect/subcmd_pingnodes.go b/cmd/praefect/subcmd_pingnodes.go
index 742f8eb12..cc2f87ff6 100644
--- a/cmd/praefect/subcmd_pingnodes.go
+++ b/cmd/praefect/subcmd_pingnodes.go
@@ -15,12 +15,18 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
)
+type (
+ virtualStorage string
+ gitalyStorage string
+)
+
type nodePing struct {
- address string
- storages map[string]struct{} // set of storages this node hosts
- vStorages map[string]struct{} // set of virtual storages node belongs to
- token string // auth token
- err error // any error during dial/ping
+ address string
+ // set of storages this node hosts
+ storages map[gitalyStorage][]virtualStorage
+ vStorages map[virtualStorage]struct{} // set of virtual storages node belongs to
+ token string // auth token
+ err error // any error during dial/ping
}
func flattenNodes(conf config.Config) map[string]*nodePing {
@@ -28,17 +34,23 @@ func flattenNodes(conf config.Config) map[string]*nodePing {
// flatten nodes between virtual storages
for _, vs := range conf.VirtualStorages {
+ vsName := virtualStorage(vs.Name)
for _, node := range vs.Nodes {
+ gsName := gitalyStorage(node.Storage)
+
n, ok := nodeByAddress[node.Address]
if !ok {
n = &nodePing{
- storages: map[string]struct{}{},
- vStorages: map[string]struct{}{},
+ storages: map[gitalyStorage][]virtualStorage{},
+ vStorages: map[virtualStorage]struct{}{},
}
}
n.address = node.Address
- n.storages[node.Storage] = struct{}{}
- n.vStorages[vs.Name] = struct{}{}
+
+ s := n.storages[gsName]
+ n.storages[gsName] = append(s, vsName)
+
+ n.vStorages[vsName] = struct{}{}
n.token = node.Token
nodeByAddress[node.Address] = n
}
@@ -115,25 +127,37 @@ func (npr *nodePing) isConsistent(cc *grpc.ClientConn) bool {
return false
}
- storagesSet := make(map[string]bool, len(resp.StorageStatuses))
+ storagesSet := make(map[gitalyStorage]bool, len(resp.StorageStatuses))
- knownStoragesSet := make(map[string]bool, len(npr.storages))
+ knownStoragesSet := make(map[gitalyStorage]bool, len(npr.storages))
for k := range npr.storages {
knownStoragesSet[k] = true
}
consistent := true
for _, status := range resp.StorageStatuses {
- if storagesSet[status.StorageName] {
+ gStorage := gitalyStorage(status.StorageName)
+
+ // only proceed if the gitaly storage belongs to a configured
+ // virtual storage
+ if len(npr.storages[gStorage]) == 0 {
+ continue
+ }
+
+ if storagesSet[gStorage] {
npr.log("ERROR: remote has duplicated storage: %q", status.StorageName)
consistent = false
continue
}
- storagesSet[status.StorageName] = true
+ storagesSet[gStorage] = true
if status.Readable && status.Writeable {
- npr.log("SUCCESS: %q is served by Gitaly", status.StorageName)
- delete(knownStoragesSet, status.StorageName) // storage found
+ npr.log(
+ "SUCCESS: confirmed Gitaly storage %q in virtual storages %v is served",
+ status.StorageName,
+ npr.storages[gStorage],
+ )
+ delete(knownStoragesSet, gStorage) // storage found
} else {
npr.log("ERROR: storage %q is not readable or writable", status.StorageName)
consistent = false
@@ -187,5 +211,5 @@ func (npr *nodePing) checkNode() {
npr.log("ERROR: %v", npr.err)
return
}
- npr.log("SUCCESS: node is consistent!")
+ npr.log("SUCCESS: node configuration is consistent!")
}
diff --git a/cmd/praefect/subcmd_pingnodes_test.go b/cmd/praefect/subcmd_pingnodes_test.go
new file mode 100644
index 000000000..495161211
--- /dev/null
+++ b/cmd/praefect/subcmd_pingnodes_test.go
@@ -0,0 +1,124 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/config"
+ "gitlab.com/gitlab-org/gitaly/internal/praefect/models"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+)
+
+type mockServerService struct {
+ gitalypb.UnimplementedServerServiceServer
+ serverInfoFunc func(ctx context.Context, r *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error)
+}
+
+func (m mockServerService) ServerInfo(ctx context.Context, r *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) {
+ return m.serverInfoFunc(ctx, r)
+}
+
+func TestSubCmdDialNodes(t *testing.T) {
+ var resp *gitalypb.ServerInfoResponse
+ mockSvc := &mockServerService{
+ serverInfoFunc: func(_ context.Context, _ *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) {
+ return resp, nil
+ },
+ }
+ ln, clean := listenAndServe(t,
+ []svcRegistrar{
+ registerHealthService,
+ registerServerService(mockSvc),
+ },
+ )
+ defer clean()
+
+ decorateLogs := func(s []string) []string {
+ for i, ss := range s {
+ s[i] = fmt.Sprintf("[unix:/%s]: %s\n", ln.Addr(), ss)
+ }
+ return s
+ }
+
+ for _, tt := range []struct {
+ name string
+ conf config.Config
+ resp *gitalypb.ServerInfoResponse
+ logs string
+ }{
+ {
+ name: "2 virtuals, 2 storages, 1 node",
+ conf: config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: []*models.Node{
+ {
+ Storage: "1",
+ Address: "unix:/" + ln.Addr().String(),
+ },
+ },
+ },
+ {
+ Name: "storage-1",
+ Nodes: []*models.Node{
+ {
+ Storage: "2",
+ Address: "unix:/" + ln.Addr().String(),
+ },
+ },
+ },
+ },
+ },
+ resp: &gitalypb.ServerInfoResponse{
+ StorageStatuses: []*gitalypb.ServerInfoResponse_StorageStatus{
+ {
+ StorageName: "1",
+ Readable: true,
+ Writeable: true,
+ },
+ {
+ StorageName: "2",
+ Readable: true,
+ Writeable: true,
+ },
+ },
+ },
+ logs: strings.Join(decorateLogs([]string{
+ "dialing...",
+ "dialed successfully!",
+ "checking health...",
+ "SUCCESS: node is healthy!",
+ "checking consistency...",
+ "SUCCESS: confirmed Gitaly storage \"1\" in virtual storages [default] is served",
+ "SUCCESS: confirmed Gitaly storage \"2\" in virtual storages [storage-1] is served",
+ "SUCCESS: node configuration is consistent!",
+ }), ""),
+ },
+ } {
+ t.Run(tt.name, func(t *testing.T) {
+ resp = tt.resp
+ tt.conf.SocketPath = ln.Addr().String()
+ log.Print(tt.conf.SocketPath)
+
+ output := &bytes.Buffer{}
+ defer func(w io.Writer, f int) {
+ log.SetOutput(w)
+ log.SetFlags(f) // reinstate timestamp
+ }(log.Writer(), log.Flags())
+ log.SetOutput(output)
+ log.SetFlags(0) // remove timestamp to make output deterministic
+
+ cmd := dialNodesSubcommand{}
+ require.NoError(t, cmd.Exec(nil, tt.conf))
+
+ require.Equal(t, tt.logs, output.String())
+ })
+ }
+}
diff --git a/cmd/praefect/subcmd_test.go b/cmd/praefect/subcmd_test.go
new file mode 100644
index 000000000..dc3e40758
--- /dev/null
+++ b/cmd/praefect/subcmd_test.go
@@ -0,0 +1,68 @@
+package main
+
+import (
+ "fmt"
+ "net"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "gitlab.com/gitlab-org/gitaly/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
+)
+
+// svcRegistrar is a function that registers a gRPC service with a server
+// instance
+type svcRegistrar func(*grpc.Server)
+
+func registerHealthService(srv *grpc.Server) {
+ healthSrvr := health.NewServer()
+ grpc_health_v1.RegisterHealthServer(srv, healthSrvr)
+ healthSrvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
+}
+
+func registerServerService(impl gitalypb.ServerServiceServer) svcRegistrar {
+ return func(srv *grpc.Server) {
+ gitalypb.RegisterServerServiceServer(srv, impl)
+ }
+}
+
+func registerPraefectInfoServer(impl gitalypb.PraefectInfoServiceServer) svcRegistrar {
+ return func(srv *grpc.Server) {
+ gitalypb.RegisterPraefectInfoServiceServer(srv, impl)
+ }
+}
+
+func listenAndServe(t testing.TB, svcs []svcRegistrar) (net.Listener, testhelper.Cleanup) {
+ t.Helper()
+
+ tmp, clean := testhelper.TempDir(t)
+
+ ln, err := net.Listen("unix", filepath.Join(tmp, "gitaly.sock"))
+ require.NoError(t, err)
+
+ srv := grpc.NewServer()
+
+ for _, s := range svcs {
+ s(srv)
+ }
+
+ go func() { require.NoError(t, srv.Serve(ln)) }()
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ // verify the service is up
+ addr := fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr())
+ cc, err := grpc.DialContext(ctx, addr, grpc.WithBlock(), grpc.WithInsecure())
+ require.NoError(t, err)
+ require.NoError(t, cc.Close())
+
+ return ln, func() {
+ srv.Stop()
+ clean()
+ }
+}