diff options
author | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-07-14 12:48:48 +0300 |
---|---|---|
committer | Quang-Minh Nguyen <qmnguyen@gitlab.com> | 2023-07-14 12:48:48 +0300 |
commit | 4c314694734330f910ce5d1409c0d462a02628ff (patch) | |
tree | c1634683a5c16667ea56cabd66fa6926688b72bd | |
parent | f2c232b9898c255cff523240d7f53b77ae698e74 (diff) | |
parent | dade902f7fa9b42e5b1d42f66ee0d88bde67369f (diff) |
Merge branch 'qmnguyen0711/improve-spawn-token-observability' into 'master'
Add more metrics and logs to spawn token
See merge request https://gitlab.com/gitlab-org/gitaly/-/merge_requests/6039
Merged-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
Approved-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Approved-by: Will Chandler <wchandler@gitlab.com>
Reviewed-by: Patrick Steinhardt <psteinhardt@gitlab.com>
Reviewed-by: Quang-Minh Nguyen <qmnguyen@gitlab.com>
-rw-r--r-- | internal/command/command.go | 36 | ||||
-rw-r--r-- | internal/command/command_test.go | 56 | ||||
-rw-r--r-- | internal/command/option.go | 9 | ||||
-rw-r--r-- | internal/command/spawntoken.go | 122 | ||||
-rw-r--r-- | internal/command/spawntoken_test.go | 183 |
5 files changed, 290 insertions, 116 deletions
diff --git a/internal/command/command.go b/internal/command/command.go index bbea588c8..6ccc25adc 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -69,13 +69,6 @@ var ( }, []string{"grpc_service", "grpc_method", "cmd", "subcmd", "ctxswitchtype", "git_version"}, ) - spawnTokenAcquiringSeconds = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_command_spawn_token_acquiring_seconds_total", - Help: "Sum of time spent waiting for a spawn token", - }, - []string{"grpc_service", "grpc_method", "cmd", "git_version"}, - ) // exportedEnvVars contains a list of environment variables // that are always exported to child processes on spawn @@ -117,8 +110,21 @@ var ( // envInjector is responsible for injecting environment variables required for tracing into // the child process. envInjector = labkittracing.NewEnvInjector() + + // globalSpawnTokenManager is responsible for limiting the total number of commands that can spawn at a time in a + // Gitaly process. + globalSpawnTokenManager *SpawnTokenManager ) +func init() { + var err error + globalSpawnTokenManager, err = NewSpawnTokenManagerFromEnv() + if err != nil { + panic(err) + } + prometheus.MustRegister(globalSpawnTokenManager) +} + const ( // maxStderrBytes is at most how many bytes will be written to stderr maxStderrBytes = 10000 // 10kb @@ -188,17 +194,15 @@ func New(ctx context.Context, nameAndArgs []string, opts ...Option) (*Command, e opt(&cfg) } - spawnStartTime := time.Now() - putToken, err := getSpawnToken(ctx) + spawnTokenManager := cfg.spawnTokenManager + if spawnTokenManager == nil { + spawnTokenManager = globalSpawnTokenManager + } + putToken, err := spawnTokenManager.GetSpawnToken(ctx) if err != nil { return nil, err } - service, method := methodFromContext(ctx) cmdName := path.Base(nameAndArgs[0]) - spawnTokenAcquiringSeconds. - WithLabelValues(service, method, cmdName, cfg.gitVersion). - Add(getSpawnTokenAcquiringSeconds(spawnStartTime)) - defer putToken() logPid := -1 @@ -530,10 +534,6 @@ func (c *Command) Pid() int { return c.cmd.Process.Pid } -var getSpawnTokenAcquiringSeconds = func(t time.Time) float64 { - return time.Since(t).Seconds() -} - type stdinSentinel struct{} func (stdinSentinel) Read([]byte) (int, error) { diff --git a/internal/command/command_test.go b/internal/command/command_test.go index dd23c85d6..b037b22e8 100644 --- a/internal/command/command_test.go +++ b/internal/command/command_test.go @@ -14,8 +14,6 @@ import ( "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" - grpcmwtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" @@ -169,24 +167,19 @@ func TestNew_rejectContextWithoutDone(t *testing.T) { } func TestNew_spawnTimeout(t *testing.T) { - ctx := testhelper.Context(t) - - defer func(ch chan struct{}, t time.Duration) { - spawnTokens = ch - spawnConfig.Timeout = t - }(spawnTokens, spawnConfig.Timeout) + t.Parallel() - // This unbuffered channel will behave like a full/blocked buffered channel. - spawnTokens = make(chan struct{}) - // Speed up the test by lowering the timeout + ctx := testhelper.Context(t) spawnTimeout := 200 * time.Millisecond - spawnConfig.Timeout = spawnTimeout + spawnTokenManager := NewSpawnTokenManager(SpawnConfig{ + Timeout: spawnTimeout, + MaxParallel: 0, + }) tick := time.After(spawnTimeout / 2) - errCh := make(chan error) go func() { - _, err := New(ctx, []string{"true"}) + _, err := New(ctx, []string{"true"}, WithSpawnTokenManager(spawnTokenManager)) errCh <- err }() @@ -458,41 +451,6 @@ func TestCommand_logMessage(t *testing.T) { assert.Equal(t, "/sys/fs/cgroup/1", logEntry.Data["command.cgroup_path"]) } -func TestNew_commandSpawnTokenMetrics(t *testing.T) { - defer func(old func(time.Time) float64) { - getSpawnTokenAcquiringSeconds = old - }(getSpawnTokenAcquiringSeconds) - - getSpawnTokenAcquiringSeconds = func(t time.Time) float64 { - return 1 - } - - spawnTokenAcquiringSeconds.Reset() - - ctx := testhelper.Context(t) - - tags := grpcmwtags.NewTags() - tags.Set("grpc.request.fullMethod", "/test.Service/TestRPC") - ctx = grpcmwtags.SetInContext(ctx, tags) - - cmd, err := New(ctx, []string{"echo", "goodbye, cruel world."}) - - require.NoError(t, err) - require.NoError(t, cmd.Wait()) - - expectedMetrics := `# HELP gitaly_command_spawn_token_acquiring_seconds_total Sum of time spent waiting for a spawn token -# TYPE gitaly_command_spawn_token_acquiring_seconds_total counter -gitaly_command_spawn_token_acquiring_seconds_total{cmd="echo",git_version="",grpc_method="TestRPC",grpc_service="test.Service"} 1 -` - require.NoError( - t, - testutil.CollectAndCompare( - spawnTokenAcquiringSeconds, - bytes.NewBufferString(expectedMetrics), - ), - ) -} - func TestCommand_withFinalizer(t *testing.T) { t.Parallel() diff --git a/internal/command/option.go b/internal/command/option.go index e5c0fdeda..881f71fb7 100644 --- a/internal/command/option.go +++ b/internal/command/option.go @@ -22,6 +22,7 @@ type config struct { cgroupsManager cgroups.Manager cgroupsAddCommandOpts []cgroups.AddCommandOption + spawnTokenManager *SpawnTokenManager } // Option is an option that can be passed to `New()` for controlling how the command is being @@ -96,6 +97,14 @@ func WithCgroup(cgroupsManager cgroups.Manager, opts ...cgroups.AddCommandOption } } +// WithSpawnTokenManager assigns a spawn token manager for the command. If this option is not set, the command uses +// the process-global spawn token manager. +func WithSpawnTokenManager(spawnTokenManager *SpawnTokenManager) Option { + return func(cfg *config) { + cfg.spawnTokenManager = spawnTokenManager + } +} + // WithFinalizer sets up the finalizer to be run when the command is being wrapped up. It will be // called after `Wait()` has returned. func WithFinalizer(finalizer func(context.Context, *Command)) Option { diff --git a/internal/command/spawntoken.go b/internal/command/spawntoken.go index ff7ef7f5e..a48052daa 100644 --- a/internal/command/spawntoken.go +++ b/internal/command/spawntoken.go @@ -7,7 +7,6 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/tracing" @@ -15,24 +14,11 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) -var ( - spawnTokens chan struct{} - spawnConfig SpawnConfig - - spawnTimeoutCount = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "gitaly_spawn_timeouts_total", - Help: "Number of process spawn timeouts", - }, - ) -) - // SpawnConfig holds configuration for command spawning timeouts and parallelism. type SpawnConfig struct { // This default value (10 seconds) is very high. Spawning should take // milliseconds or less. If we hit 10 seconds, something is wrong, and - // failing the request will create breathing room. Can be modified at - // runtime with the GITALY_COMMAND_SPAWN_TIMEOUT environment variable. + // failing the request will create breathing room. Timeout time.Duration `split_words:"true" default:"10s"` // MaxSpawnParallel limits the number of goroutines that can spawn a @@ -46,34 +32,104 @@ type SpawnConfig struct { MaxParallel int `split_words:"true" default:"10"` } -func init() { - envconfig.MustProcess("gitaly_command_spawn", &spawnConfig) - spawnTokens = make(chan struct{}, spawnConfig.MaxParallel) +// SpawnTokenManager limits the number of goroutines that can spawn a process at a time. +type SpawnTokenManager struct { + spawnTokens chan struct{} + spawnConfig SpawnConfig + spawnTimeoutCount prometheus.Counter + spawnTokenWaitingLength prometheus.Gauge + spawnWaitingTimeHistogram prometheus.Histogram + spawnForkingTimeHistogram prometheus.Histogram +} + +// Describe is used to describe Prometheus metrics. +func (m *SpawnTokenManager) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(m, descs) +} + +// Collect is used to collect Prometheus metrics. +func (m *SpawnTokenManager) Collect(metrics chan<- prometheus.Metric) { + m.spawnTimeoutCount.Collect(metrics) + m.spawnTokenWaitingLength.Collect(metrics) + m.spawnWaitingTimeHistogram.Collect(metrics) + m.spawnForkingTimeHistogram.Collect(metrics) +} + +// NewSpawnTokenManager creates a SpawnTokenManager object from the input config +func NewSpawnTokenManager(config SpawnConfig) *SpawnTokenManager { + return &SpawnTokenManager{ + spawnTokens: make(chan struct{}, config.MaxParallel), + spawnConfig: config, + spawnTimeoutCount: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gitaly_spawn_timeouts_total", + Help: "Number of process spawn timeouts", + }, + ), + spawnTokenWaitingLength: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "gitaly_spawn_token_waiting_length", + Help: "The current length of the queue waiting for spawn tokens", + }, + ), + spawnWaitingTimeHistogram: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "gitaly_spawn_waiting_time_seconds", + Help: "Histogram of time waiting for spawn tokens", + Buckets: []float64{0.001, 0.005, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0, 10.0}, + }, + ), + spawnForkingTimeHistogram: prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "gitaly_spawn_forking_time_seconds", + Help: "Histogram of actual forking time after spawn tokens are acquired", + Buckets: []float64{0.001, 0.005, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0, 10.0}, + }, + ), + } } -func getSpawnToken(ctx context.Context) (putToken func(), err error) { +// NewSpawnTokenManagerFromEnv creates a SpawnTokenManager object with the config parsed from environment variables +// - GITALY_COMMAND_SPAWN_TIMEOUT for spawn token `Timeout` config +// - GITALY_COMMAND_SPAWN_MAX_PARALLEL for spawn token `MaxParallel` config +func NewSpawnTokenManagerFromEnv() (*SpawnTokenManager, error) { + var spawnConfig SpawnConfig + err := envconfig.Process("gitaly_command_spawn", &spawnConfig) + if err != nil { + return nil, err + } + return NewSpawnTokenManager(spawnConfig), nil +} + +// GetSpawnToken blocks until the caller either acquires a token or timeout. The caller is expected to call returned +// function to put the token back to the queue. +func (m *SpawnTokenManager) GetSpawnToken(ctx context.Context) (putToken func(), err error) { // Go has a global lock (syscall.ForkLock) for spawning new processes. // This select statement is a safety valve to prevent lots of Gitaly // requests from piling up behind the ForkLock if forking for some reason // slows down. This has happened in real life, see // https://gitlab.com/gitlab-org/gitaly/issues/823. - start := time.Now() + startWaiting := time.Now() + m.spawnTokenWaitingLength.Inc() + defer m.spawnTokenWaitingLength.Dec() span, ctx := tracing.StartSpanIfHasParent(ctx, "command.getSpawnToken", nil) defer span.Finish() select { - case spawnTokens <- struct{}{}: - recordTime(ctx, start, "") + case m.spawnTokens <- struct{}{}: + m.recordQueuingTime(ctx, startWaiting, "") + startForking := time.Now() return func() { - <-spawnTokens + <-m.spawnTokens + m.recordForkTime(ctx, startForking) }, nil - case <-time.After(spawnConfig.Timeout): - recordTime(ctx, start, "spawn token timeout") - spawnTimeoutCount.Inc() + case <-time.After(m.spawnConfig.Timeout): + m.recordQueuingTime(ctx, startWaiting, "spawn token timeout") + m.spawnTimeoutCount.Inc() - msg := fmt.Sprintf("process spawn timed out after %v", spawnConfig.Timeout) + msg := fmt.Sprintf("process spawn timed out after %v", m.spawnConfig.Timeout) return nil, structerr.NewResourceExhausted(msg).WithDetail(&gitalypb.LimitError{ ErrorMessage: msg, RetryAfter: durationpb.New(0), @@ -83,8 +139,9 @@ func getSpawnToken(ctx context.Context) (putToken func(), err error) { } } -func recordTime(ctx context.Context, start time.Time, msg string) { +func (m *SpawnTokenManager) recordQueuingTime(ctx context.Context, start time.Time, msg string) { delta := time.Since(start) + m.spawnWaitingTimeHistogram.Observe(delta.Seconds()) if customFields := log.CustomFieldsFromContext(ctx); customFields != nil { customFields.RecordSum("command.spawn_token_wait_ms", int(delta.Milliseconds())) @@ -93,3 +150,12 @@ func recordTime(ctx context.Context, start time.Time, msg string) { } } } + +func (m *SpawnTokenManager) recordForkTime(ctx context.Context, start time.Time) { + delta := time.Since(start) + m.spawnForkingTimeHistogram.Observe(delta.Seconds()) + + if customFields := log.CustomFieldsFromContext(ctx); customFields != nil { + customFields.RecordSum("command.spawn_token_fork_ms", int(delta.Milliseconds())) + } +} diff --git a/internal/command/spawntoken_test.go b/internal/command/spawntoken_test.go index dc025b2ea..a3a7f7093 100644 --- a/internal/command/spawntoken_test.go +++ b/internal/command/spawntoken_test.go @@ -1,9 +1,13 @@ package command import ( + "fmt" + "strings" "testing" "time" + "github.com/kelseyhightower/envconfig" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -13,38 +17,152 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) +// TestNewSpawnTokenManagerFromEnv mocks ENV variables, thus cannot run in parallel. +func TestNewSpawnTokenManagerFromEnv(t *testing.T) { + for _, tc := range []struct { + desc string + envs map[string]string + expectedErr error + expectedConfig SpawnConfig + }{ + { + desc: "spawn token ENVs are not set", + expectedConfig: SpawnConfig{ + Timeout: 10 * time.Second, + MaxParallel: 10, + }, + }, + { + desc: "spawn token ENVs are set correctly", + envs: map[string]string{ + "GITALY_COMMAND_SPAWN_MAX_PARALLEL": "100", + "GITALY_COMMAND_SPAWN_TIMEOUT": "99s", + }, + expectedConfig: SpawnConfig{ + Timeout: 99 * time.Second, + MaxParallel: 100, + }, + }, + { + desc: "spawn token ENVs are set incorrectly", + envs: map[string]string{ + "GITALY_COMMAND_SPAWN_MAX_PARALLEL": "100", + "GITALY_COMMAND_SPAWN_TIMEOUT": "hello", + }, + expectedErr: &envconfig.ParseError{ + KeyName: "GITALY_COMMAND_SPAWN_TIMEOUT", + FieldName: "Timeout", + TypeName: "time.Duration", + Value: "hello", + Err: fmt.Errorf(`time: invalid duration "hello"`), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + for key, value := range tc.envs { + t.Setenv(key, value) + } + manager, err := NewSpawnTokenManagerFromEnv() + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + require.Nil(t, manager) + } else { + require.NoError(t, err) + require.NotNil(t, manager) + require.Equal(t, manager.spawnConfig, tc.expectedConfig) + } + }) + } +} + func TestGetSpawnToken_CommandStats(t *testing.T) { t.Parallel() - ctx := testhelper.Context(t) - ctx = log.InitContextCustomFields(ctx) + ctx := log.InitContextCustomFields(testhelper.Context(t)) + manager := NewSpawnTokenManager(SpawnConfig{ + Timeout: 10 * time.Second, + MaxParallel: 1, + }) + + putFirstToken, err := manager.GetSpawnToken(ctx) + require.NoError(t, err) + + waitSecondAcquire := make(chan struct{}) + waitSecondPut := make(chan struct{}) + go func() { + putSecondToken, err := manager.GetSpawnToken(ctx) + require.NoError(t, err) + close(waitSecondAcquire) + putSecondToken() + <-waitSecondPut + }() + + // The second goroutine is queued waiting for the first goroutine to finish + expected := strings.NewReader(` +# HELP gitaly_spawn_token_waiting_length The current length of the queue waiting for spawn tokens +# TYPE gitaly_spawn_token_waiting_length gauge +gitaly_spawn_token_waiting_length 1 + +`) + require.NoError(t, testutil.CollectAndCompare(manager, expected, + "gitaly_spawn_token_waiting_length", + )) + + putFirstToken() + // Wait for the goroutine to acquire the token + <-waitSecondAcquire + + // As the second goroutine finishes waiting, the queue length goes back to 0 + expected = strings.NewReader(` +# HELP gitaly_spawn_token_waiting_length The current length of the queue waiting for spawn tokens +# TYPE gitaly_spawn_token_waiting_length gauge +gitaly_spawn_token_waiting_length 0 - putToken, err := getSpawnToken(ctx) - require.Nil(t, err) - putToken() +`) + require.NoError(t, testutil.CollectAndCompare(manager, expected, + "gitaly_spawn_token_waiting_length", + )) + + // Wait until for the second gorutine to put back the token + close(waitSecondPut) + expected = strings.NewReader(` +# HELP gitaly_spawn_forking_time_seconds Histogram of time waiting for spawn tokens +# TYPE gitaly_spawn_forking_time_seconds histogram +gitaly_spawn_forking_time_seconds_count 2 +# HELP gitaly_spawn_timeouts_total Number of process spawn timeouts +# TYPE gitaly_spawn_timeouts_total counter +gitaly_spawn_timeouts_total 0 +# HELP gitaly_spawn_waiting_time_seconds Histogram of time waiting for spawn tokens +# TYPE gitaly_spawn_waiting_time_seconds histogram +gitaly_spawn_waiting_time_seconds_count 2 +# HELP gitaly_spawn_token_waiting_length The current length of the queue waiting for spawn tokens +# TYPE gitaly_spawn_token_waiting_length gauge +gitaly_spawn_token_waiting_length 0 + +`) + require.NoError(t, testutil.CollectAndCompare(manager, expected, + "gitaly_spawn_timeouts_total", + "gitaly_spawn_forking_time_seconds_count", + "gitaly_spawn_waiting_time_seconds_count", + "gitaly_spawn_token_waiting_length", + )) customFields := log.CustomFieldsFromContext(ctx) require.NotNil(t, customFields) - require.Contains(t, customFields.Fields(), "command.spawn_token_wait_ms") + logrusFields := customFields.Fields() + + require.Contains(t, logrusFields, "command.spawn_token_wait_ms") + require.Contains(t, logrusFields, "command.spawn_token_fork_ms") } -// This test modifies a global config, hence should never run in parallel func TestGetSpawnToken_CommandStats_timeout(t *testing.T) { - priorTimeout := spawnConfig.Timeout - priorSpawnTokens := spawnTokens - - spawnConfig.Timeout = 1 * time.Millisecond - spawnTokens = make(chan struct{}, 1) - spawnTokens <- struct{}{} - defer func() { - spawnConfig.Timeout = priorTimeout - spawnTokens = priorSpawnTokens - }() - - ctx := testhelper.Context(t) - ctx = log.InitContextCustomFields(ctx) + t.Parallel() - _, err := getSpawnToken(ctx) + ctx := log.InitContextCustomFields(testhelper.Context(t)) + manager := NewSpawnTokenManager(SpawnConfig{ + Timeout: 1 * time.Millisecond, + }) + _, err := manager.GetSpawnToken(ctx) var structErr structerr.Error require.ErrorAs(t, err, &structErr) @@ -63,5 +181,28 @@ func TestGetSpawnToken_CommandStats_timeout(t *testing.T) { logrusFields := customFields.Fields() require.GreaterOrEqual(t, logrusFields["command.spawn_token_wait_ms"], 0) + require.NotContains(t, logrusFields, "command.spawn_token_fork_ms") require.Equal(t, logrusFields["command.spawn_token_error"], "spawn token timeout") + + expected := strings.NewReader(` +# HELP gitaly_spawn_forking_time_seconds Histogram of time waiting for spawn tokens +# TYPE gitaly_spawn_forking_time_seconds histogram +gitaly_spawn_forking_time_seconds_count 1 +# HELP gitaly_spawn_timeouts_total Number of process spawn timeouts +# TYPE gitaly_spawn_timeouts_total counter +gitaly_spawn_timeouts_total 1 +# HELP gitaly_spawn_waiting_time_seconds Histogram of time waiting for spawn tokens +# TYPE gitaly_spawn_waiting_time_seconds histogram +gitaly_spawn_waiting_time_seconds_count 1 +# HELP gitaly_spawn_token_waiting_length The current length of the queue waiting for spawn tokens +# TYPE gitaly_spawn_token_waiting_length gauge +gitaly_spawn_token_waiting_length 0 + +`) + require.NoError(t, testutil.CollectAndCompare(manager, expected, + "gitaly_spawn_timeouts_total", + "gitaly_spawn_forking_time_seconds_count", + "gitaly_spawn_waiting_time_seconds_count", + "gitaly_spawn_token_waiting_length", + )) } |