Welcome to mirror list, hosted at ThFree Co, Russian Federation.

structured_logger.rb « sidekiq_logging « gitlab « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: ca9e3b8428cc32cf902da1d2500d85015a51c0da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# frozen_string_literal: true

module Gitlab
  module SidekiqLogging
    class StructuredLogger
      START_TIMESTAMP_FIELDS = %w[created_at enqueued_at].freeze
      DONE_TIMESTAMP_FIELDS = %w[started_at retried_at failed_at completed_at].freeze
      MAXIMUM_JOB_ARGUMENTS_LENGTH = 10.kilobytes

      def call(job, queue)
        started_time = get_time
        base_payload = parse_job(job)

        Sidekiq.logger.info log_job_start(base_payload)

        yield

        Sidekiq.logger.info log_job_done(job, started_time, base_payload)
      rescue => job_exception
        Sidekiq.logger.warn log_job_done(job, started_time, base_payload, job_exception)

        raise
      end

      private

      def base_message(payload)
        "#{payload['class']} JID-#{payload['jid']}"
      end

      def add_instrumentation_keys!(job, output_payload)
        output_payload.merge!(job.slice(*::Gitlab::InstrumentationHelper::KEYS))
      end

      def log_job_start(payload)
        payload['message'] = "#{base_message(payload)}: start"
        payload['job_status'] = 'start'

        scheduling_latency_s = ::Gitlab::InstrumentationHelper.queue_duration_for_job(payload)
        payload['scheduling_latency_s'] = scheduling_latency_s if scheduling_latency_s

        payload
      end

      def log_job_done(job, started_time, payload, job_exception = nil)
        payload = payload.dup
        add_instrumentation_keys!(job, payload)

        elapsed_time = elapsed(started_time)
        add_time_keys!(elapsed_time, payload)

        message = base_message(payload)

        if job_exception
          payload['message'] = "#{message}: fail: #{payload['duration']} sec"
          payload['job_status'] = 'fail'
          payload['error_message'] = job_exception.message
          payload['error_class'] = job_exception.class.name
        else
          payload['message'] = "#{message}: done: #{payload['duration']} sec"
          payload['job_status'] = 'done'
        end

        convert_to_iso8601(payload, DONE_TIMESTAMP_FIELDS)

        payload
      end

      def add_time_keys!(time, payload)
        payload['duration'] = time[:duration].round(6)

        # ignore `cpu_s` if the platform does not support Process::CLOCK_THREAD_CPUTIME_ID (time[:cputime] == 0)
        # supported OS version can be found at: https://www.rubydoc.info/stdlib/core/2.1.6/Process:clock_gettime
        payload['cpu_s'] = time[:cputime].round(6) if time[:cputime] > 0
        payload['completed_at'] = Time.now.utc
      end

      def parse_job(job)
        job = job.dup

        # Add process id params
        job['pid'] = ::Process.pid

        job.delete('args') unless ENV['SIDEKIQ_LOG_ARGUMENTS']
        job['args'] = limited_job_args(job['args']) if job['args']

        convert_to_iso8601(job, START_TIMESTAMP_FIELDS)

        job
      end

      def convert_to_iso8601(payload, keys)
        keys.each do |key|
          payload[key] = format_time(payload[key]) if payload[key]
        end
      end

      def elapsed(t0)
        t1 = get_time
        {
          duration: t1[:now] - t0[:now],
          cputime: t1[:thread_cputime] - t0[:thread_cputime]
        }
      end

      def get_time
        {
          now: current_time,
          thread_cputime: defined?(Process::CLOCK_THREAD_CPUTIME_ID) ? Process.clock_gettime(Process::CLOCK_THREAD_CPUTIME_ID) : 0
        }
      end

      def current_time
        Gitlab::Metrics::System.monotonic_time
      end

      def format_time(timestamp)
        return timestamp if timestamp.is_a?(String)

        Time.at(timestamp).utc.iso8601(6)
      end

      def limited_job_args(args)
        return unless args.is_a?(Array)

        total_length = 0
        limited_args = args.take_while do |arg|
          total_length += arg.to_json.length

          total_length <= MAXIMUM_JOB_ARGUMENTS_LENGTH
        end

        limited_args.push('...') if total_length > MAXIMUM_JOB_ARGUMENTS_LENGTH

        limited_args
      end
    end
  end
end