diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-04-13 11:58:34 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-04-13 11:58:34 +0300 |
commit | 28c0539251cf00a675b78e987ff30b7741425653 (patch) | |
tree | fdba94fee77cc88cc45bf629c462c49decce3d52 | |
parent | 9255b4522513ee91d7fef535137d57deb498bbf8 (diff) | |
parent | e82c5be4c36f2828aabc01cacb383028b4e8b3f2 (diff) |
Merge branch 'ps-config-validation-praefect-2' into 'master'
praefect: Implementation of the new configuration validation
Closes #4650
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/5601
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Co-authored-by: Pavlo Strokov <pstrokov@gitlab.com>
-rw-r--r-- | internal/cli/praefect/main.go | 4 | ||||
-rw-r--r-- | internal/cli/praefect/subcmd_configuration_validate_test.go | 21 | ||||
-rw-r--r-- | internal/errors/cfgerror/validate.go | 100 | ||||
-rw-r--r-- | internal/errors/cfgerror/validate_test.go | 121 | ||||
-rw-r--r-- | internal/gitaly/config/config.go | 10 | ||||
-rw-r--r-- | internal/gitaly/config/config_test.go | 8 | ||||
-rw-r--r-- | internal/gitaly/config/prometheus/config.go | 2 | ||||
-rw-r--r-- | internal/gitaly/config/prometheus/config_test.go | 2 | ||||
-rw-r--r-- | internal/praefect/config/config.go | 151 | ||||
-rw-r--r-- | internal/praefect/config/config_test.go | 452 | ||||
-rw-r--r-- | internal/praefect/config/node.go | 10 | ||||
-rw-r--r-- | internal/praefect/config/node_test.go | 34 | ||||
-rw-r--r-- | internal/praefect/config/testhelper_test.go | 11 |
13 files changed, 868 insertions, 58 deletions
diff --git a/internal/cli/praefect/main.go b/internal/cli/praefect/main.go index 46877f070..2fa7b5822 100644 --- a/internal/cli/praefect/main.go +++ b/internal/cli/praefect/main.go @@ -326,7 +326,7 @@ func run( sidechannelRegistry := sidechannel.NewRegistry() backchannelCfg := backchannel.DefaultConfiguration() - backchannelCfg.AcceptBacklog = conf.Yamux.AcceptBacklog + backchannelCfg.AcceptBacklog = int(conf.Yamux.AcceptBacklog) backchannelCfg.MaximumStreamWindowSizeBytes = conf.Yamux.MaximumStreamWindowSizeBytes clientHandshaker := backchannel.NewClientHandshaker( logger, @@ -587,7 +587,7 @@ func run( cfg := repocleaner.Cfg{ RunInterval: conf.RepositoriesCleanup.RunInterval.Duration(), LivenessInterval: 30 * time.Second, - RepositoriesInBatch: conf.RepositoriesCleanup.RepositoriesInBatch, + RepositoriesInBatch: int(conf.RepositoriesCleanup.RepositoriesInBatch), } repoCleaner := repocleaner.NewRunner(cfg, logger, healthChecker, nodeSet.Connections(), storageSync, storageSync, repocleaner.NewLogWarnAction(logger)) if err := repoCleaner.Run(ctx, helper.NewTimerTicker(conf.RepositoriesCleanup.CheckInterval.Duration())); err != nil && !errors.Is(context.Canceled, err) { diff --git a/internal/cli/praefect/subcmd_configuration_validate_test.go b/internal/cli/praefect/subcmd_configuration_validate_test.go index bcf87ef6f..74152ff63 100644 --- a/internal/cli/praefect/subcmd_configuration_validate_test.go +++ b/internal/cli/praefect/subcmd_configuration_validate_test.go @@ -71,6 +71,27 @@ election_strategy = invalid`) One of accept-dataloss, check, configuration, dataloss, dial-nodes, list-storages, list-untracked-repositories, metadata, remove-repository, set-replication-factor, sql-migrate, sql-migrate-down, sql-migrate-status, sql-ping, track-repositories, track-repository, verify `, }, + { + name: "validation failures", + exitCode: 2, + stdin: func(t *testing.T) io.Reader { + return strings.NewReader("") + }, + stdout: `{ + "errors": [ + { + "message": "none of \"socket_path\", \"listen_addr\" or \"tls_listen_addr\" is set" + }, + { + "key": [ + "virtual_storage" + ], + "message": "not set" + } + ] +} +`, + }, } { tc := tc t.Run(tc.name, func(t *testing.T) { diff --git a/internal/errors/cfgerror/validate.go b/internal/errors/cfgerror/validate.go index 9d32f2eac..dfbc61e80 100644 --- a/internal/errors/cfgerror/validate.go +++ b/internal/errors/cfgerror/validate.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "reflect" "strings" "golang.org/x/exp/constraints" @@ -25,12 +26,12 @@ var ( ErrNotAbsolutePath = errors.New("not an absolute path") // ErrNotUnique should be used when the value must be unique, but there are duplicates. ErrNotUnique = errors.New("not unique") - // ErrIsNegative should be used when the positive value or 0 is expected. - ErrIsNegative = errors.New("is negative") // ErrBadOrder should be used when the order of the elements is wrong. ErrBadOrder = errors.New("bad order") // ErrNotInRange should be used when the value is not in expected range of values. ErrNotInRange = errors.New("not in range") + // ErrUnsupportedValue should be used when the value is not supported. + ErrUnsupportedValue = errors.New("not supported") ) // ValidationError represents an issue with provided configuration. @@ -179,14 +180,6 @@ func PathIsAbs(path string) error { return NewValidationError(fmt.Errorf("%w: %q", ErrNotAbsolutePath, path)) } -// IsPositive returns an error if provided value less than a zero. -func IsPositive[T constraints.Signed | constraints.Float](val T) error { - if val < 0 { - return NewValidationError(fmt.Errorf("%w: %v", ErrIsNegative, val)) - } - return nil -} - // InRangeOpt represents configuration options for InRange function. type InRangeOpt int @@ -219,19 +212,26 @@ func (opts inRangeOpts[T]) greaterThan(val, max T) bool { return val >= max } -func (opts inRangeOpts[T]) format(min, max T) string { - format := "%s%v, %v%s" - openRange := "(" - closeRange := ")" +func (opts inRangeOpts[T]) formatRange(min, max T) string { + return opts.formatRangeMin(min) + ", " + opts.formatRangeMax(max) +} + +func (opts inRangeOpts[T]) formatRangeMin(min T) string { for _, opt := range opts { if opt == InRangeOptIncludeMin { - openRange = "[" + return fmt.Sprintf("[%v", min) } + } + return fmt.Sprintf("(%v", min) +} + +func (opts inRangeOpts[T]) formatRangeMax(max T) string { + for _, opt := range opts { if opt == InRangeOptIncludeMax { - closeRange = "]" + return fmt.Sprintf("%v]", max) } } - return fmt.Sprintf(format, openRange, min, max, closeRange) + return fmt.Sprintf("%v)", max) } // Numeric includes types that can be used in the comparison operations. @@ -242,8 +242,72 @@ type Numeric interface { // InRange returns an error if 'val' is less than 'min' or greater or equal to 'max'. func InRange[T Numeric](min, max, val T, opts ...InRangeOpt) error { if cmp := inRangeOpts[T](opts); cmp.lessThan(val, min) || cmp.greaterThan(val, max) { - return NewValidationError(fmt.Errorf("%w: %v out of %s", ErrNotInRange, val, cmp.format(min, max))) + return NewValidationError(fmt.Errorf("%w: %v out of %s", ErrNotInRange, val, cmp.formatRange(min, max))) + } + + return nil +} + +// IsSupportedValue ensures the provided 'value' is one listed as 'supportedValues'. +func IsSupportedValue[T comparable](value T, supportedValues ...T) error { + for _, supportedValue := range supportedValues { + if value == supportedValue { + return nil + } + } + + if reflect.TypeOf(value).Kind() == reflect.String { + return NewValidationError(fmt.Errorf(`%w: "%v"`, ErrUnsupportedValue, value)) + } + + return NewValidationError(fmt.Errorf("%w: %v", ErrUnsupportedValue, value)) +} + +type numeric[T Numeric] struct { + value T +} + +// Comparable wraps value, so the method can be invoked on it. +func Comparable[T Numeric](val T) numeric[T] { + return numeric[T]{value: val} +} + +// LessThan returns an error if val is less than one hold by c. +func (c numeric[T]) LessThan(val T) error { + if cmp := inRangeOpts[T](nil); cmp.lessThan(val, c.value) { + err := fmt.Errorf("%w: %v is not less than %v", ErrNotInRange, c.value, val) + return NewValidationError(err) + } + return nil +} + +// GreaterThan returns an error if val is greater than one hold by c. +func (c numeric[T]) GreaterThan(val T) error { + if cmp := inRangeOpts[T](nil); cmp.greaterThan(val, c.value) { + err := fmt.Errorf("%w: %v is not greater than %v", ErrNotInRange, c.value, val) + return NewValidationError(err) } + return nil +} + +// GreaterOrEqual returns an error if val is greater than one hold by c. +func (c numeric[T]) GreaterOrEqual(val T) error { + if cmp := (inRangeOpts[T]{InRangeOptIncludeMax}); cmp.greaterThan(val, c.value) { + err := fmt.Errorf("%w: %v is not greater than or equal to %v", ErrNotInRange, c.value, val) + return NewValidationError(err) + } + return nil +} + +// InRange returns an error if 'c.value' is less than 'min' or greater or equal to 'max'. +func (c numeric[T]) InRange(min, max T, opts ...InRangeOpt) error { + return InRange(min, max, c.value, opts...) +} +// NotEmptySlice returns an error if provided slice has no elements. +func NotEmptySlice[T any](slice []T) error { + if len(slice) == 0 { + return NewValidationError(ErrNotSet) + } return nil } diff --git a/internal/errors/cfgerror/validate_test.go b/internal/errors/cfgerror/validate_test.go index c4084eb10..af8d4729a 100644 --- a/internal/errors/cfgerror/validate_test.go +++ b/internal/errors/cfgerror/validate_test.go @@ -186,14 +186,7 @@ func TestPathIsAbs(t *testing.T) { } } -func TestIsPositive(t *testing.T) { - t.Parallel() - require.NoError(t, IsPositive(0)) - require.NoError(t, IsPositive(100)) - require.Equal(t, NewValidationError(fmt.Errorf("%w: -1.2", ErrIsNegative)), IsPositive(-1.2)) -} - -func TestInRange(t *testing.T) { +func TestComparable_InRange(t *testing.T) { t.Parallel() for _, tc := range []struct { @@ -232,8 +225,118 @@ func TestInRange(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - err := InRange(tc.min, tc.max, tc.val, tc.opts...) + err := Comparable(tc.val).InRange(tc.min, tc.max, tc.opts...) + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestComparable_LessThan(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + val, other int + expectedErr error + }{ + { + name: "value is less", + val: 10, + other: 11, + }, + { + name: "value is equal", + val: 10, + other: 10, + expectedErr: NewValidationError(fmt.Errorf("%w: 10 is not less than 10", ErrNotInRange)), + }, + { + name: "value is bigger", + val: 10, + other: 9, + expectedErr: NewValidationError(fmt.Errorf("%w: 10 is not less than 9", ErrNotInRange)), + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := Comparable(tc.val).LessThan(tc.other) require.Equal(t, tc.expectedErr, err) }) } } + +func TestComparable_GreaterThan(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + val, other int + expectedErr error + }{ + { + name: "value is greater", + val: 11, + other: 10, + }, + { + name: "value is equal", + val: 10, + other: 10, + expectedErr: NewValidationError(fmt.Errorf("%w: 10 is not greater than 10", ErrNotInRange)), + }, + { + name: "value is lesser", + val: 10, + other: 11, + expectedErr: NewValidationError(fmt.Errorf("%w: 10 is not greater than 11", ErrNotInRange)), + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := Comparable(tc.val).GreaterThan(tc.other) + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestComparable_GreaterOrEqual(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + val, other int + expectedErr error + }{ + { + name: "value is greater", + val: 11, + other: 10, + }, + { + name: "value is equal", + val: 10, + other: 10, + }, + { + name: "value is lesser", + val: 10, + other: 11, + expectedErr: NewValidationError(fmt.Errorf("%w: 10 is not greater than or equal to 11", ErrNotInRange)), + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := Comparable(tc.val).GreaterOrEqual(tc.other) + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestIsSupportedValue(t *testing.T) { + t.Parallel() + require.NoError(t, IsSupportedValue(1, 1, 2, 3)) + require.Equal(t, NewValidationError(fmt.Errorf("%w: 0", ErrUnsupportedValue)), IsSupportedValue(0)) + require.Equal(t, NewValidationError(fmt.Errorf("%w: 1", ErrUnsupportedValue)), IsSupportedValue(1, 0, 10)) + require.Equal(t, NewValidationError(fmt.Errorf(`%w: "c"`, ErrUnsupportedValue)), IsSupportedValue("c", "a", "b")) +} + +func TestNotEmptySlice(t *testing.T) { + t.Parallel() + require.NoError(t, NotEmptySlice([]int{1})) + require.Equal(t, NewValidationError(ErrNotSet), NotEmptySlice([]string{})) + require.Equal(t, NewValidationError(ErrNotSet), NotEmptySlice[any](nil)) +} diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index c920edf81..1081c71f6 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -458,9 +458,9 @@ type PackObjectsLimiting struct { // Validate runs validation on all fields and compose all found errors. func (pol PackObjectsLimiting) Validate() error { return cfgerror.New(). - Append(cfgerror.IsPositive(pol.MaxConcurrency), "max_concurrency"). - Append(cfgerror.IsPositive(pol.MaxQueueLength), "max_queue_length"). - Append(cfgerror.IsPositive(pol.MaxQueueWait.Duration()), "max_queue_wait"). + Append(cfgerror.Comparable(pol.MaxConcurrency).GreaterOrEqual(0), "max_concurrency"). + Append(cfgerror.Comparable(pol.MaxQueueLength).GreaterOrEqual(0), "max_queue_length"). + Append(cfgerror.Comparable(pol.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait"). AsError() } @@ -480,7 +480,7 @@ func (scc StreamCacheConfig) Validate() error { return cfgerror.New(). Append(cfgerror.PathIsAbs(scc.Dir), "dir"). - Append(cfgerror.IsPositive(scc.MaxAge.Duration()), "max_age"). + Append(cfgerror.Comparable(scc.MaxAge.Duration()).GreaterOrEqual(0), "max_age"). AsError() } @@ -622,7 +622,7 @@ func (cfg *Cfg) ValidateV2() error { {field: "gitlab", validate: cfg.Gitlab.Validate}, {field: "gitlab-shell", validate: cfg.GitlabShell.Validate}, {field: "graceful_restart_timeout", validate: func() error { - return cfgerror.IsPositive(cfg.GracefulRestartTimeout.Duration()) + return cfgerror.Comparable(cfg.GracefulRestartTimeout.Duration()).GreaterOrEqual(0) }}, {field: "daily_maintenance", validate: func() error { storages := make([]string, len(cfg.Storages)) diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index f130c0d00..b21d630fe 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -1784,7 +1784,7 @@ func TestPackObjectsLimiting_Validate(t *testing.T) { t, cfgerror.ValidationErrors{ cfgerror.NewValidationError( - fmt.Errorf("%w: -1", cfgerror.ErrIsNegative), + fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange), "max_concurrency", ), }, @@ -1798,7 +1798,7 @@ func TestPackObjectsLimiting_Validate(t *testing.T) { t, cfgerror.ValidationErrors{ cfgerror.NewValidationError( - fmt.Errorf("%w: -1", cfgerror.ErrIsNegative), + fmt.Errorf("%w: -1 is not greater than or equal to 0", cfgerror.ErrNotInRange), "max_queue_length", ), }, @@ -1810,7 +1810,7 @@ func TestPackObjectsLimiting_Validate(t *testing.T) { t, cfgerror.ValidationErrors{ cfgerror.NewValidationError( - fmt.Errorf("%w: -1m0s", cfgerror.ErrIsNegative), + fmt.Errorf("%w: -1m0s is not greater than or equal to 0s", cfgerror.ErrNotInRange), "max_queue_wait", ), }, @@ -2195,7 +2195,7 @@ func TestStreamCacheConfig_Validate(t *testing.T) { "dir", ), cfgerror.NewValidationError( - fmt.Errorf("%w: %s", cfgerror.ErrIsNegative, time.Duration(-1)), + fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange), "max_age", ), }, diff --git a/internal/gitaly/config/prometheus/config.go b/internal/gitaly/config/prometheus/config.go index cbf697d23..cca0c0043 100644 --- a/internal/gitaly/config/prometheus/config.go +++ b/internal/gitaly/config/prometheus/config.go @@ -51,7 +51,7 @@ func (c *Config) Validate() error { return nil } - errs := cfgerror.New().Append(cfgerror.IsPositive(c.ScrapeTimeout.Duration()), "scrape_timeout") + errs := cfgerror.New().Append(cfgerror.Comparable(c.ScrapeTimeout.Duration()).GreaterOrEqual(0), "scrape_timeout") if !sort.IsSorted(sort.Float64Slice(c.GRPCLatencyBuckets)) { err := fmt.Errorf("%w: expected asc: %v", cfgerror.ErrBadOrder, c.GRPCLatencyBuckets) errs = errs.Append(err, "grpc_latency_buckets") diff --git a/internal/gitaly/config/prometheus/config_test.go b/internal/gitaly/config/prometheus/config_test.go index 0eca772b6..e4d316509 100644 --- a/internal/gitaly/config/prometheus/config_test.go +++ b/internal/gitaly/config/prometheus/config_test.go @@ -33,7 +33,7 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( - fmt.Errorf("%w: -1ns", cfgerror.ErrIsNegative), + fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange), "scrape_timeout", ), cfgerror.NewValidationError( diff --git a/internal/praefect/config/config.go b/internal/praefect/config/config.go index 17ed0e478..57549dfd0 100644 --- a/internal/praefect/config/config.go +++ b/internal/praefect/config/config.go @@ -6,11 +6,13 @@ import ( "fmt" "io" "os" + "sort" "time" "github.com/hashicorp/yamux" "github.com/pelletier/go-toml/v2" promclient "github.com/prometheus/client_golang/prometheus" + "gitlab.com/gitlab-org/gitaly/v15/internal/errors/cfgerror" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/log" @@ -44,8 +46,9 @@ const ( minimalSyncRunInterval = time.Minute ) -//nolint:revive // This is unintentionally missing documentation. +// Failover contains configuration for the mechanism that tracks healthiness of the cluster nodes. type Failover struct { + // Enabled is a trigger used to check if failover is enabled or not. Enabled bool `toml:"enabled,omitempty"` // ElectionStrategy is the strategy to use for electing primaries nodes. ElectionStrategy ElectionStrategy `toml:"election_strategy,omitempty"` @@ -82,6 +85,39 @@ func (f Failover) ErrorThresholdsConfigured() (bool, error) { return true, nil } +// Validate returns a list of failed checks. +func (f Failover) Validate() error { + if !f.Enabled { + // If it is not enabled we shouldn't care about provided values + // as they won't be used. + return nil + } + + errs := cfgerror.New(). + Append(cfgerror.IsSupportedValue(f.ElectionStrategy, ElectionStrategyLocal, ElectionStrategySQL, ElectionStrategyPerRepository), "election_strategy"). + Append(cfgerror.Comparable(f.BootstrapInterval.Duration()).GreaterOrEqual(0), "bootstrap_interval"). + Append(cfgerror.Comparable(f.MonitorInterval.Duration()).GreaterOrEqual(0), "monitor_interval"). + Append(cfgerror.Comparable(f.ErrorThresholdWindow.Duration()).GreaterOrEqual(0), "error_threshold_window") + + if f.ErrorThresholdWindow == 0 && f.WriteErrorThresholdCount == 0 && f.ReadErrorThresholdCount == 0 { + return errs.AsError() + } + + if f.ErrorThresholdWindow == 0 { + errs = errs.Append(cfgerror.ErrNotSet, "error_threshold_window") + } + + if f.WriteErrorThresholdCount == 0 { + errs = errs.Append(cfgerror.ErrNotSet, "write_error_threshold_count") + } + + if f.ReadErrorThresholdCount == 0 { + errs = errs.Append(cfgerror.ErrNotSet, "read_error_threshold_count") + } + + return errs.AsError() +} + // BackgroundVerification contains configuration options for the repository background verification. type BackgroundVerification struct { // VerificationInterval determines the duration after a replica due for reverification. @@ -92,6 +128,13 @@ type BackgroundVerification struct { DeleteInvalidRecords bool `toml:"delete_invalid_records"` } +// Validate runs validation on all fields and compose all found errors. +func (bv BackgroundVerification) Validate() error { + return cfgerror.New(). + Append(cfgerror.Comparable(bv.VerificationInterval.Duration()).GreaterOrEqual(0), "verification_interval"). + AsError() +} + // DefaultBackgroundVerificationConfig returns the default background verification configuration. func DefaultBackgroundVerificationConfig() BackgroundVerification { return BackgroundVerification{ @@ -109,6 +152,20 @@ type Reconciliation struct { HistogramBuckets []float64 `toml:"histogram_buckets,omitempty"` } +// Validate runs validation on all fields and compose all found errors. +func (r Reconciliation) Validate() error { + errs := cfgerror.New(). + Append(cfgerror.Comparable(r.SchedulingInterval.Duration()).GreaterOrEqual(0), "scheduling_interval") + + if r.SchedulingInterval != 0 { + if !sort.Float64sAreSorted(r.HistogramBuckets) { + errs = errs.Append(cfgerror.ErrBadOrder, "histogram_buckets") + } + } + + return errs.AsError() +} + // DefaultReconciliationConfig returns the default values for reconciliation configuration. func DefaultReconciliationConfig() Reconciliation { return Reconciliation{ @@ -127,6 +184,14 @@ type Replication struct { ParallelStorageProcessingWorkers uint `toml:"parallel_storage_processing_workers,omitempty"` } +// Validate runs validation on all fields and compose all found errors. +func (r Replication) Validate() error { + return cfgerror.New(). + Append(cfgerror.Comparable(r.BatchSize).GreaterOrEqual(1), "batch_size"). + Append(cfgerror.Comparable(r.ParallelStorageProcessingWorkers).GreaterOrEqual(1), "parallel_storage_processing_workers"). + AsError() +} + // DefaultReplicationConfig returns the default values for replication configuration. func DefaultReplicationConfig() Replication { return Replication{BatchSize: 10, ParallelStorageProcessingWorkers: 1} @@ -155,13 +220,11 @@ type Config struct { Auth auth.Config `toml:"auth,omitempty"` TLS config.TLS `toml:"tls,omitempty"` DB `toml:"database,omitempty"` - Failover Failover `toml:"failover,omitempty"` - // Keep for legacy reasons: remove after Omnibus has switched - FailoverEnabled bool `toml:"failover_enabled,omitempty"` - MemoryQueueEnabled bool `toml:"memory_queue_enabled,omitempty"` - GracefulStopTimeout duration.Duration `toml:"graceful_stop_timeout,omitempty"` - RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup,omitempty"` - Yamux Yamux `toml:"yamux,omitempty"` + Failover Failover `toml:"failover,omitempty"` + MemoryQueueEnabled bool `toml:"memory_queue_enabled,omitempty"` + GracefulStopTimeout duration.Duration `toml:"graceful_stop_timeout,omitempty"` + RepositoriesCleanup RepositoriesCleanup `toml:"repositories_cleanup,omitempty"` + Yamux Yamux `toml:"yamux,omitempty"` } // Yamux contains Yamux related configuration values. @@ -171,7 +234,7 @@ type Yamux struct { MaximumStreamWindowSizeBytes uint32 `toml:"maximum_stream_window_size_bytes,omitempty"` // AcceptBacklog sets the maximum number of stream openings in-flight before further openings // block. - AcceptBacklog int `toml:"accept_backlog,omitempty"` + AcceptBacklog uint `toml:"accept_backlog,omitempty"` } func (cfg Yamux) validate() error { @@ -188,12 +251,20 @@ func (cfg Yamux) validate() error { return nil } +// Validate runs validation on all fields and compose all found errors. +func (cfg Yamux) Validate() error { + return cfgerror.New(). + Append(cfgerror.Comparable(cfg.MaximumStreamWindowSizeBytes).GreaterOrEqual(262144), "maximum_stream_window_size_bytes"). + Append(cfgerror.Comparable(cfg.AcceptBacklog).GreaterOrEqual(1), "accept_backlog"). + AsError() +} + // DefaultYamuxConfig returns the default Yamux configuration values. func DefaultYamuxConfig() Yamux { defaultCfg := yamux.DefaultConfig() return Yamux{ MaximumStreamWindowSizeBytes: defaultCfg.MaxStreamWindowSize, - AcceptBacklog: defaultCfg.AcceptBacklog, + AcceptBacklog: uint(defaultCfg.AcceptBacklog), } } @@ -209,6 +280,19 @@ type VirtualStorage struct { DefaultReplicationFactor int `toml:"default_replication_factor,omitempty"` } +// Validate runs validation on all fields and compose all found errors. +func (vs VirtualStorage) Validate() error { + errs := cfgerror.New(). + Append(cfgerror.NotBlank(vs.Name), "name"). + Append(cfgerror.NotEmptySlice(vs.Nodes), "node") + + for i, node := range vs.Nodes { + errs = errs.Append(node.Validate(), "node", fmt.Sprintf("[%d]", i)) + } + + return errs.AsError() +} + // FromFile loads the config for the passed file path func FromFile(filePath string) (Config, error) { b, err := os.ReadFile(filePath) @@ -236,12 +320,6 @@ func FromReader(reader io.Reader) (Config, error) { return Config{}, err } - // TODO: Remove this after failover_enabled has moved under a separate failover section. This is for - // backwards compatibility only - if conf.FailoverEnabled { - conf.Failover.Enabled = true - } - conf.setDefaults() return *conf, nil @@ -336,7 +414,29 @@ func (c *Config) Validate() error { // It exists as a demonstration of the new validation implementation based on the usage // of the cfgerror package. func (c *Config) ValidateV2() error { - return nil + errs := cfgerror.New(). + Append(func() error { + if c.SocketPath == "" && c.ListenAddr == "" && c.TLSListenAddr == "" { + return fmt.Errorf(`none of "socket_path", "listen_addr" or "tls_listen_addr" is set`) + } + return nil + }()). + Append(c.BackgroundVerification.Validate(), "background_verification"). + Append(c.Reconciliation.Validate(), "reconciliation"). + Append(c.Replication.Validate(), "replication"). + Append(c.Prometheus.Validate(), "prometheus"). + Append(c.TLS.Validate(), "tls"). + Append(c.Failover.Validate(), "failover"). + Append(cfgerror.Comparable(c.GracefulStopTimeout.Duration()).GreaterOrEqual(0), "graceful_stop_timeout"). + Append(c.RepositoriesCleanup.Validate(), "repositories_cleanup"). + Append(c.Yamux.Validate(), "yamux"). + Append(cfgerror.NotEmptySlice(c.VirtualStorages), "virtual_storage") + + for i, storage := range c.VirtualStorages { + errs = errs.Append(storage.Validate(), "virtual_storage", fmt.Sprintf("[%d]", i)) + } + + return errs.AsError() } // NeedsSQL returns true if the driver for SQL needs to be initialized @@ -438,7 +538,22 @@ type RepositoriesCleanup struct { // RunInterval: the check runs if the previous operation was done at least RunInterval before. RunInterval duration.Duration `toml:"run_interval,omitempty"` // RepositoriesInBatch is the number of repositories to pass as a batch for processing. - RepositoriesInBatch int `toml:"repositories_in_batch,omitempty"` + RepositoriesInBatch uint `toml:"repositories_in_batch,omitempty"` +} + +// Validate runs validation on all fields and compose all found errors. +func (rc RepositoriesCleanup) Validate() error { + if rc.RunInterval == 0 { + // If RunInterval is set to 0 it means it is disabled. The validation doesn't make + // sense then as it won't be used. + return nil + } + + return cfgerror.New(). + Append(cfgerror.Comparable(rc.CheckInterval.Duration()).GreaterOrEqual(minimalSyncCheckInterval), "check_interval"). + Append(cfgerror.Comparable(rc.RunInterval.Duration()).GreaterOrEqual(minimalSyncRunInterval), "run_interval"). + Append(cfgerror.Comparable(rc.RepositoriesInBatch).GreaterOrEqual(1), "repositories_in_batch"). + AsError() } // DefaultRepositoriesCleanup contains default configuration values for the RepositoriesCleanup. diff --git a/internal/praefect/config/config_test.go b/internal/praefect/config/config_test.go index 1ed5e0cd9..6f53ee32a 100644 --- a/internal/praefect/config/config_test.go +++ b/internal/praefect/config/config_test.go @@ -5,17 +5,22 @@ package config import ( "bytes" "errors" + "fmt" "os" + "path/filepath" "testing" "time" "github.com/pelletier/go-toml/v2" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/errors/cfgerror" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/log" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/prometheus" "gitlab.com/gitlab-org/gitaly/v15/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v15/internal/helper/duration" + "gitlab.com/gitlab-org/gitaly/v15/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" ) func TestConfigValidation(t *testing.T) { @@ -536,3 +541,450 @@ func TestSerialization(t *testing.T) { require.Equal(t, "listen_addr = 'localhost:5640'\n", out.String()) }) } + +func TestFailover_Validate(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + failover Failover + expectedErr error + }{ + { + name: "empty disabled config", + failover: Failover{}, + }, + { + name: "all set valid", + failover: Failover{ + Enabled: true, + ElectionStrategy: ElectionStrategySQL, + ErrorThresholdWindow: duration.Duration(1), + WriteErrorThresholdCount: 1, + ReadErrorThresholdCount: 1, + BootstrapInterval: duration.Duration(1), + MonitorInterval: duration.Duration(1), + }, + }, + { + name: "all valid with disabled error threshold", + failover: Failover{ + ElectionStrategy: ElectionStrategySQL, + BootstrapInterval: duration.Duration(1), + MonitorInterval: duration.Duration(1), + }, + }, + { + name: "all set invalid except ErrorThresholdWindow", + failover: Failover{ + Enabled: true, + ElectionStrategy: ElectionStrategy("bad"), + ErrorThresholdWindow: duration.Duration(-1), + WriteErrorThresholdCount: 0, + ReadErrorThresholdCount: 0, + BootstrapInterval: duration.Duration(-1), + MonitorInterval: duration.Duration(-1), + }, + expectedErr: cfgerror.ValidationErrors{ + { + Key: []string{"election_strategy"}, + Cause: fmt.Errorf(`%w: "bad"`, cfgerror.ErrUnsupportedValue), + }, + { + Key: []string{"bootstrap_interval"}, + Cause: fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange), + }, + { + Key: []string{"monitor_interval"}, + Cause: fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange), + }, + { + Key: []string{"error_threshold_window"}, + Cause: fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange), + }, + { + Key: []string{"write_error_threshold_count"}, + Cause: cfgerror.ErrNotSet, + }, + { + Key: []string{"read_error_threshold_count"}, + Cause: cfgerror.ErrNotSet, + }, + }, + }, + { + name: "invalid error threshold", + failover: Failover{ + Enabled: true, + ElectionStrategy: ElectionStrategySQL, + ErrorThresholdWindow: duration.Duration(0), + WriteErrorThresholdCount: 1, + ReadErrorThresholdCount: 1, + }, + expectedErr: cfgerror.ValidationErrors{ + { + Key: []string{"error_threshold_window"}, + Cause: cfgerror.ErrNotSet, + }, + }, + }, + { + name: "set invalid but disabled", + failover: Failover{ + Enabled: false, + ElectionStrategy: ElectionStrategy("bad"), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + errs := tc.failover.Validate() + require.Equal(t, tc.expectedErr, errs) + }) + } +} + +func TestBackgroundVerification_Validate(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + verification BackgroundVerification + expectedErr error + }{ + { + name: "empty is valid", + verification: BackgroundVerification{}, + }, + { + name: "valid", + verification: BackgroundVerification{ + DeleteInvalidRecords: true, + VerificationInterval: duration.Duration(1), + }, + }, + { + name: "zero", + verification: BackgroundVerification{ + DeleteInvalidRecords: true, + VerificationInterval: duration.Duration(0), + }, + }, + { + name: "invalid", + verification: BackgroundVerification{ + VerificationInterval: duration.Duration(-1), + }, + expectedErr: cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange), + "verification_interval", + ), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.verification.Validate() + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestReconciliation_Validate(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + reconciliation Reconciliation + expectedErr error + }{ + { + name: "empty is valid", + reconciliation: Reconciliation{}, + }, + { + name: "valid", + reconciliation: Reconciliation{ + SchedulingInterval: duration.Duration(1), + HistogramBuckets: []float64{-1, 0, 1}, + }, + }, + { + name: "invalid", + reconciliation: Reconciliation{ + SchedulingInterval: duration.Duration(-1), + HistogramBuckets: []float64{-1, 1, 0}, + }, + expectedErr: cfgerror.ValidationErrors{{ + Key: []string{"scheduling_interval"}, + Cause: fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange), + }, { + Key: []string{"histogram_buckets"}, + Cause: cfgerror.ErrBadOrder, + }}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.reconciliation.Validate() + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestReplication_Validate(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + replication Replication + expectedErr error + }{ + { + name: "valid", + replication: Replication{ + BatchSize: 1, + ParallelStorageProcessingWorkers: 1, + }, + }, + { + name: "invalid", + replication: Replication{ + BatchSize: 0, + ParallelStorageProcessingWorkers: 0, + }, + expectedErr: cfgerror.ValidationErrors{{ + Key: []string{"batch_size"}, + Cause: fmt.Errorf("%w: 0 is not greater than or equal to 1", cfgerror.ErrNotInRange), + }, { + Key: []string{"parallel_storage_processing_workers"}, + Cause: fmt.Errorf("%w: 0 is not greater than or equal to 1", cfgerror.ErrNotInRange), + }}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.replication.Validate() + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestVirtualStorage_Validate(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + vs VirtualStorage + expectedErr error + }{ + { + name: "ok", + vs: VirtualStorage{ + Name: "vs", + Nodes: []*Node{{Storage: "st", Address: "addr"}}, + }, + }, + { + name: "invalid node", + vs: VirtualStorage{ + Name: "vs", + Nodes: []*Node{{Storage: "", Address: "addr"}}, + }, + expectedErr: cfgerror.ValidationErrors{ + { + Key: []string{"node", "[0]", "storage"}, + Cause: cfgerror.ErrBlankOrEmpty, + }, + }, + }, + { + name: "invalid", + vs: VirtualStorage{}, + expectedErr: cfgerror.ValidationErrors{ + { + Key: []string{"name"}, + Cause: cfgerror.ErrBlankOrEmpty, + }, + { + Key: []string{"node"}, + Cause: cfgerror.ErrNotSet, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.vs.Validate() + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestRepositoriesCleanup_Validate(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + cleanup RepositoriesCleanup + expectedErr error + }{ + { + name: "valid", + cleanup: RepositoriesCleanup{ + CheckInterval: duration.Duration(10 * time.Minute), + RunInterval: duration.Duration(1 * time.Minute), + RepositoriesInBatch: 10, + }, + }, + { + name: "noop because run interval is 0", + cleanup: RepositoriesCleanup{ + CheckInterval: -duration.Duration(10 * time.Minute), + RunInterval: 0, + RepositoriesInBatch: 10, + }, + }, + { + name: "invalid", + cleanup: RepositoriesCleanup{ + CheckInterval: duration.Duration(1), + RunInterval: duration.Duration(50 * time.Second), + RepositoriesInBatch: 0, + }, + expectedErr: cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: 1ns is not greater than or equal to 1m0s", cfgerror.ErrNotInRange), + "check_interval", + ), + cfgerror.NewValidationError( + fmt.Errorf("%w: 50s is not greater than or equal to 1m0s", cfgerror.ErrNotInRange), + "run_interval", + ), + cfgerror.NewValidationError( + fmt.Errorf("%w: 0 is not greater than or equal to 1", cfgerror.ErrNotInRange), + "repositories_in_batch", + ), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.cleanup.Validate() + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestYamux_Validate(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + yamux Yamux + expectedErr error + }{ + { + name: "valid", + yamux: Yamux{ + MaximumStreamWindowSizeBytes: 1024 * 1024, + AcceptBacklog: 5, + }, + }, + { + name: "invalid", + yamux: Yamux{ + MaximumStreamWindowSizeBytes: 1024, + AcceptBacklog: 0, + }, + expectedErr: cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: 1024 is not greater than or equal to 262144", cfgerror.ErrNotInRange), + "maximum_stream_window_size_bytes", + ), + cfgerror.NewValidationError( + fmt.Errorf("%w: 0 is not greater than or equal to 1", cfgerror.ErrNotInRange), + "accept_backlog", + ), + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.yamux.Validate() + require.Equal(t, tc.expectedErr, err) + }) + } +} + +func TestConfig_ValidateV2(t *testing.T) { + t.Parallel() + + t.Run("valid", func(t *testing.T) { + cfg := Config{ + Replication: Replication{ + BatchSize: 1, + ParallelStorageProcessingWorkers: 1, + }, + ListenAddr: "localhost", + VirtualStorages: []*VirtualStorage{{ + Name: "name", + Nodes: []*Node{{ + Storage: "storage", + Address: "localhost", + }}, + }}, + Yamux: Yamux{ + MaximumStreamWindowSizeBytes: 300000, + AcceptBacklog: 1, + }, + } + require.NoError(t, cfg.ValidateV2()) + }) + + t.Run("invalid", func(t *testing.T) { + tmpDir := testhelper.TempDir(t) + tmpFile := filepath.Join(tmpDir, "file") + require.NoError(t, os.WriteFile(tmpFile, nil, perm.PublicFile)) + cfg := Config{ + BackgroundVerification: BackgroundVerification{ + VerificationInterval: duration.Duration(-1), + }, + Reconciliation: Reconciliation{ + SchedulingInterval: duration.Duration(-1), + }, + Replication: Replication{ + BatchSize: 0, + ParallelStorageProcessingWorkers: 1, + }, + Prometheus: prometheus.Config{ + ScrapeTimeout: duration.Duration(-1), + GRPCLatencyBuckets: []float64{1}, + }, + PrometheusExcludeDatabaseFromDefaultMetrics: false, + TLS: config.TLS{ + CertPath: "/doesnt/exist", + KeyPath: tmpFile, + }, + Failover: Failover{ + Enabled: true, + ElectionStrategy: ElectionStrategy("invalid"), + }, + GracefulStopTimeout: duration.Duration(-1), + RepositoriesCleanup: RepositoriesCleanup{ + CheckInterval: duration.Duration(time.Hour), + RunInterval: duration.Duration(1), + RepositoriesInBatch: 1, + }, + Yamux: Yamux{ + MaximumStreamWindowSizeBytes: 0, + AcceptBacklog: 1, + }, + } + err := cfg.ValidateV2() + + negativeDurationErr := fmt.Errorf("%w: -1ns is not greater than or equal to 0s", cfgerror.ErrNotInRange) + require.Equal(t, cfgerror.ValidationErrors{ + cfgerror.NewValidationError(errors.New(`none of "socket_path", "listen_addr" or "tls_listen_addr" is set`)), + cfgerror.NewValidationError(negativeDurationErr, "background_verification", "verification_interval"), + cfgerror.NewValidationError(negativeDurationErr, "reconciliation", "scheduling_interval"), + cfgerror.NewValidationError(fmt.Errorf("%w: 0 is not greater than or equal to 1", cfgerror.ErrNotInRange), "replication", "batch_size"), + cfgerror.NewValidationError(negativeDurationErr, "prometheus", "scrape_timeout"), + cfgerror.NewValidationError(fmt.Errorf(`%w: "/doesnt/exist"`, cfgerror.ErrDoesntExist), "tls", "certificate_path"), + cfgerror.NewValidationError(fmt.Errorf(`%w: "invalid"`, cfgerror.ErrUnsupportedValue), "failover", "election_strategy"), + cfgerror.NewValidationError(negativeDurationErr, "graceful_stop_timeout"), + cfgerror.NewValidationError(fmt.Errorf("%w: 1ns is not greater than or equal to 1m0s", cfgerror.ErrNotInRange), "repositories_cleanup", "run_interval"), + cfgerror.NewValidationError(fmt.Errorf("%w: 0 is not greater than or equal to 262144", cfgerror.ErrNotInRange), "yamux", "maximum_stream_window_size_bytes"), + cfgerror.NewValidationError(cfgerror.ErrNotSet, "virtual_storage"), + }, err) + }) +} diff --git a/internal/praefect/config/node.go b/internal/praefect/config/node.go index 47bd576ca..c10b31bd0 100644 --- a/internal/praefect/config/node.go +++ b/internal/praefect/config/node.go @@ -3,6 +3,8 @@ package config import ( "encoding/json" "fmt" + + "gitlab.com/gitlab-org/gitaly/v15/internal/errors/cfgerror" ) // Node describes an address that serves a storage @@ -24,3 +26,11 @@ func (n Node) MarshalJSON() ([]byte, error) { func (n Node) String() string { return fmt.Sprintf("storage_name: %s, address: %s", n.Storage, n.Address) } + +// Validate runs validation on all fields and compose all found errors. +func (n Node) Validate() error { + return cfgerror.New(). + Append(cfgerror.NotBlank(n.Storage), "storage"). + Append(cfgerror.NotBlank(n.Address), "address"). + AsError() +} diff --git a/internal/praefect/config/node_test.go b/internal/praefect/config/node_test.go index 56bac085c..c2d63e34d 100644 --- a/internal/praefect/config/node_test.go +++ b/internal/praefect/config/node_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v15/internal/errors/cfgerror" ) func TestNode_MarshalJSON(t *testing.T) { @@ -21,3 +22,36 @@ func TestNode_MarshalJSON(t *testing.T) { require.NoError(t, err) require.JSONEq(t, `{"storage":"storage","address":"address"}`, string(b)) } + +func TestNode_Validate(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + node Node + expectedErr error + }{ + { + name: "valid", + node: Node{Storage: "storage", Address: "address"}, + }, + { + name: "invalid", + node: Node{Storage: "", Address: " \n \t"}, + expectedErr: cfgerror.ValidationErrors{ + { + Key: []string{"storage"}, + Cause: cfgerror.ErrBlankOrEmpty, + }, + { + Key: []string{"address"}, + Cause: cfgerror.ErrBlankOrEmpty, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := tc.node.Validate() + require.Equal(t, tc.expectedErr, err) + }) + } +} diff --git a/internal/praefect/config/testhelper_test.go b/internal/praefect/config/testhelper_test.go new file mode 100644 index 000000000..94a8084b7 --- /dev/null +++ b/internal/praefect/config/testhelper_test.go @@ -0,0 +1,11 @@ +package config + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v15/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} |