Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/gitlab-org/gitaly.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohn Cai <jcai@gitlab.com>2021-11-08 19:20:42 +0300
committerJohn Cai <jcai@gitlab.com>2021-11-09 22:31:34 +0300
commitde417beefb4c8bef07ae47d869b99c043fdc236a (patch)
treea2a6e78864e6706de6299070d6050b368db45176
parent62b56a317c35ad66b2cb6ffbc419b6bfaa6f4aa5 (diff)
praefect check: add node connectivity/consistency check
Add a praefect startup check that checks if praefect can reach each of its internal gitaly nodes, and also checks that those gitaly nodes can reach its storage paths with readability/writeability. Changelog: added
-rw-r--r--cmd/praefect/subcmd.go6
-rw-r--r--internal/praefect/checks.go16
-rw-r--r--internal/praefect/checks_test.go206
3 files changed, 226 insertions, 2 deletions
diff --git a/cmd/praefect/subcmd.go b/cmd/praefect/subcmd.go
index fa371cf7c..21279939c 100644
--- a/cmd/praefect/subcmd.go
+++ b/cmd/praefect/subcmd.go
@@ -37,7 +37,11 @@ var subcommands = map[string]subcmd{
removeRepositoryCmdName: newRemoveRepository(logger),
trackRepositoryCmdName: newTrackRepository(logger),
listUntrackedRepositoriesName: newListUntrackedRepositories(logger, os.Stdout),
- checkCmdName: newCheckSubcommand(os.Stdout, praefect.NewPraefectMigrationCheck),
+ checkCmdName: newCheckSubcommand(
+ os.Stdout,
+ praefect.NewPraefectMigrationCheck,
+ praefect.NewGitalyNodeConnectivityCheck,
+ ),
}
// subCommand returns an exit code, to be fed into os.Exit.
diff --git a/internal/praefect/checks.go b/internal/praefect/checks.go
index 0c1c6b650..6b8c0ecf8 100644
--- a/internal/praefect/checks.go
+++ b/internal/praefect/checks.go
@@ -8,6 +8,7 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
)
// Severity is a type that indicates the severity of a check
@@ -70,3 +71,18 @@ func NewPraefectMigrationCheck(conf config.Config) *Check {
Severity: Fatal,
}
}
+
+// NewGitalyNodeConnectivityCheck returns a check that ensures Praefect can talk to all nodes of all virtual storages
+func NewGitalyNodeConnectivityCheck(conf config.Config) *Check {
+ logger := conf.ConfigureLogger()
+
+ return &Check{
+ Name: "gitaly node connectivity & disk access",
+ Description: "confirms if praefect can reach all of its gitaly nodes, and " +
+ "whether or not the gitaly nodes can read/write from and to its storages.",
+ Run: func(ctx context.Context) error {
+ return nodes.PingAll(ctx, conf, logger)
+ },
+ Severity: Fatal,
+ }
+}
diff --git a/internal/praefect/checks_test.go b/internal/praefect/checks_test.go
index 3453b2856..0a56801e9 100644
--- a/internal/praefect/checks_test.go
+++ b/internal/praefect/checks_test.go
@@ -3,7 +3,10 @@ package praefect
import (
"context"
"fmt"
+ "net"
+ "path/filepath"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -11,6 +14,11 @@ import (
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/migrations"
+ "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
+ "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/health"
+ "google.golang.org/grpc/health/grpc_health_v1"
)
func TestPraefectMigrations_success(t *testing.T) {
@@ -66,7 +74,203 @@ func TestPraefectMigrations_success(t *testing.T) {
migrationCheck := NewPraefectMigrationCheck(cfg)
assert.Equal(t, "praefect migrations", migrationCheck.Name)
assert.Equal(t, "confirms whether or not all praefect migrations have run", migrationCheck.Description)
- assert.Equal(t, tc.expectedErr, migrationCheck.Run())
+ assert.Equal(t, tc.expectedErr, migrationCheck.Run(ctx))
})
}
}
+
+type nodeAssertion struct {
+ storage string
+ token string
+ servingStatus grpc_health_v1.HealthCheckResponse_ServingStatus
+ serverReadable, serverWriteable bool
+}
+
+type mockServerServer struct {
+ gitalypb.UnimplementedServerServiceServer
+ node nodeAssertion
+}
+
+func (m *mockServerServer) ServerInfo(ctx context.Context, in *gitalypb.ServerInfoRequest) (*gitalypb.ServerInfoResponse, error) {
+ return &gitalypb.ServerInfoResponse{
+ StorageStatuses: []*gitalypb.ServerInfoResponse_StorageStatus{
+ {
+ StorageName: m.node.storage,
+ Readable: m.node.serverReadable,
+ Writeable: m.node.serverWriteable,
+ },
+ },
+ }, nil
+}
+
+func TestGitalyNodeConnectivityCheck(t *testing.T) {
+ testCases := []struct {
+ desc string
+ expectErr bool
+ nodes []nodeAssertion
+ }{
+ {
+ desc: "all nodes are healthy",
+ expectErr: false,
+ nodes: []nodeAssertion{
+ {
+ storage: "storage-0",
+ token: "token-0",
+ servingStatus: grpc_health_v1.HealthCheckResponse_SERVING,
+ serverReadable: true,
+ serverWriteable: true,
+ },
+ {
+ storage: "storage-1",
+ token: "token-1",
+ servingStatus: grpc_health_v1.HealthCheckResponse_SERVING,
+ serverReadable: true,
+ serverWriteable: true,
+ },
+ },
+ },
+ {
+ desc: "one node failed healthcheck",
+ expectErr: true,
+ nodes: []nodeAssertion{
+ {
+ storage: "storage-0",
+ token: "token-0",
+ servingStatus: grpc_health_v1.HealthCheckResponse_SERVING,
+ serverReadable: true,
+ serverWriteable: true,
+ },
+ {
+ storage: "storage-1",
+ token: "token-1",
+ servingStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
+ serverReadable: true,
+ serverWriteable: true,
+ },
+ },
+ },
+ {
+ desc: "one node failed consistency check",
+ expectErr: true,
+ nodes: []nodeAssertion{
+ {
+ storage: "storage-0",
+ token: "token-0",
+ servingStatus: grpc_health_v1.HealthCheckResponse_SERVING,
+ serverReadable: false,
+ serverWriteable: true,
+ },
+ {
+ storage: "storage-1",
+ token: "token-1",
+ servingStatus: grpc_health_v1.HealthCheckResponse_SERVING,
+ serverReadable: true,
+ serverWriteable: true,
+ },
+ },
+ },
+ {
+ desc: "all nodes failed",
+ expectErr: true,
+ nodes: []nodeAssertion{
+ {
+ storage: "storage-0",
+ token: "token-0",
+ servingStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
+ serverReadable: false,
+ serverWriteable: true,
+ },
+ {
+ storage: "storage-1",
+ token: "token-1",
+ servingStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
+ serverReadable: true,
+ serverWriteable: false,
+ },
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.desc, func(t *testing.T) {
+ tmp := testhelper.TempDir(t)
+ var cfgNodes []*config.Node
+
+ for _, n := range tc.nodes {
+ socket := filepath.Join(tmp, n.storage)
+ ln, err := net.Listen("unix", socket)
+ require.NoError(t, err)
+ healthSrv := health.NewServer()
+ healthSrv.SetServingStatus("", n.servingStatus)
+
+ srvSrv := &mockServerServer{
+ node: n,
+ }
+
+ srv := grpc.NewServer()
+ grpc_health_v1.RegisterHealthServer(srv, healthSrv)
+ gitalypb.RegisterServerServiceServer(srv, srvSrv)
+ defer srv.Stop()
+ go func() {
+ assert.NoError(t, srv.Serve(ln))
+ }()
+
+ cfgNodes = append(cfgNodes, &config.Node{
+ Storage: n.storage,
+ Token: n.token,
+ Address: fmt.Sprintf("%s://%s", ln.Addr().Network(), ln.Addr().String()),
+ })
+ }
+
+ check := NewGitalyNodeConnectivityCheck(
+ config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: cfgNodes,
+ },
+ },
+ },
+ )
+
+ ctx, cancel := testhelper.Context()
+ defer cancel()
+
+ err := check.Run(ctx)
+ if tc.expectErr {
+ assert.Regexp(t, "^the following nodes are not healthy: .+", err)
+ return
+ }
+
+ assert.Nil(t, err)
+ })
+ }
+
+ t.Run("server not listening", func(t *testing.T) {
+ tmp := testhelper.TempDir(t)
+ socketAddr := fmt.Sprintf("unix://%s", filepath.Join(tmp, "storage"))
+ cfgNodes := []*config.Node{
+ {
+ Storage: "storage",
+ Token: "token",
+ Address: socketAddr,
+ },
+ }
+
+ check := NewGitalyNodeConnectivityCheck(
+ config.Config{
+ VirtualStorages: []*config.VirtualStorage{
+ {
+ Name: "default",
+ Nodes: cfgNodes,
+ },
+ },
+ },
+ )
+
+ ctx, cancel := testhelper.Context(testhelper.ContextWithTimeout(1 * time.Second))
+ defer cancel()
+
+ assert.Errorf(t, check.Run(ctx), "the following nodes are not healthy: %s", socketAddr)
+ })
+}