1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# frozen_string_literal: true
module Gitlab
module SidekiqLogging
class StructuredLogger
START_TIMESTAMP_FIELDS = %w[created_at enqueued_at].freeze
DONE_TIMESTAMP_FIELDS = %w[started_at retried_at failed_at completed_at].freeze
MAXIMUM_JOB_ARGUMENTS_LENGTH = 10.kilobytes
def call(job, queue)
started_time = get_time
base_payload = parse_job(job)
Sidekiq.logger.info log_job_start(base_payload)
yield
Sidekiq.logger.info log_job_done(job, started_time, base_payload)
rescue => job_exception
Sidekiq.logger.warn log_job_done(job, started_time, base_payload, job_exception)
raise
end
private
def base_message(payload)
"#{payload['class']} JID-#{payload['jid']}"
end
def add_instrumentation_keys!(job, output_payload)
output_payload.merge!(job.slice(*::Gitlab::InstrumentationHelper::KEYS))
end
def log_job_start(payload)
payload['message'] = "#{base_message(payload)}: start"
payload['job_status'] = 'start'
scheduling_latency_s = ::Gitlab::InstrumentationHelper.queue_duration_for_job(payload)
payload['scheduling_latency_s'] = scheduling_latency_s if scheduling_latency_s
payload
end
def log_job_done(job, started_time, payload, job_exception = nil)
payload = payload.dup
add_instrumentation_keys!(job, payload)
elapsed_time = elapsed(started_time)
add_time_keys!(elapsed_time, payload)
message = base_message(payload)
if job_exception
payload['message'] = "#{message}: fail: #{payload['duration']} sec"
payload['job_status'] = 'fail'
payload['error_message'] = job_exception.message
payload['error_class'] = job_exception.class.name
else
payload['message'] = "#{message}: done: #{payload['duration']} sec"
payload['job_status'] = 'done'
end
convert_to_iso8601(payload, DONE_TIMESTAMP_FIELDS)
payload
end
def add_time_keys!(time, payload)
payload['duration'] = time[:duration].round(6)
# ignore `cpu_s` if the platform does not support Process::CLOCK_THREAD_CPUTIME_ID (time[:cputime] == 0)
# supported OS version can be found at: https://www.rubydoc.info/stdlib/core/2.1.6/Process:clock_gettime
payload['cpu_s'] = time[:cputime].round(6) if time[:cputime] > 0
payload['completed_at'] = Time.now.utc
end
def parse_job(job)
job = job.dup
# Add process id params
job['pid'] = ::Process.pid
job.delete('args') unless ENV['SIDEKIQ_LOG_ARGUMENTS']
job['args'] = limited_job_args(job['args']) if job['args']
convert_to_iso8601(job, START_TIMESTAMP_FIELDS)
job
end
def convert_to_iso8601(payload, keys)
keys.each do |key|
payload[key] = format_time(payload[key]) if payload[key]
end
end
def elapsed(t0)
t1 = get_time
{
duration: t1[:now] - t0[:now],
cputime: t1[:thread_cputime] - t0[:thread_cputime]
}
end
def get_time
{
now: current_time,
thread_cputime: defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0
}
end
def current_time
Gitlab::Metrics::System.monotonic_time
end
def format_time(timestamp)
return timestamp if timestamp.is_a?(String)
Time.at(timestamp).utc.iso8601(6)
end
def limited_job_args(args)
return unless args.is_a?(Array)
total_length = 0
limited_args = args.take_while do |arg|
total_length += arg.to_json.length
total_length <= MAXIMUM_JOB_ARGUMENTS_LENGTH
end
limited_args.push('...') if total_length > MAXIMUM_JOB_ARGUMENTS_LENGTH
limited_args
end
end
end
end
|