Welcome to mirror list, hosted at ThFree Co, Russian Federation.

logs.rb « elasticsearch « gitlab « lib - gitlab.com/gitlab-org/gitlab-foss.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: f976f6ce30503320e0dccad1f7400b40db639084 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# frozen_string_literal: true

module Gitlab
  module Elasticsearch
    class Logs
      InvalidCursor = Class.new(RuntimeError)

      # How many log lines to fetch in a query
      LOGS_LIMIT = 500

      def initialize(client)
        @client = client
      end

      def pod_logs(namespace, pod_name, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
        query = { bool: { must: [] } }.tap do |q|
          filter_pod_name(q, pod_name)
          filter_namespace(q, namespace)
          filter_container_name(q, container_name)
          filter_search(q, search)
          filter_times(q, start_time, end_time)
        end

        body = build_body(query, cursor)
        response = @client.search body: body

        format_response(response)
      end

      private

      def build_body(query, cursor = nil)
        body = {
          query: query,
          # reverse order so we can query N-most recent records
          sort: [
            { "@timestamp": { order: :desc } },
            { "offset": { order: :desc } }
          ],
          # only return these fields in the response
          _source: ["@timestamp", "message"],
          # fixed limit for now, we should support paginated queries
          size: ::Gitlab::Elasticsearch::Logs::LOGS_LIMIT
        }

        unless cursor.nil?
          body[:search_after] = decode_cursor(cursor)
        end

        body
      end

      def filter_pod_name(query, pod_name)
        query[:bool][:must] << {
          match_phrase: {
            "kubernetes.pod.name" => {
              query: pod_name
            }
          }
        }
      end

      def filter_namespace(query, namespace)
        query[:bool][:must] << {
          match_phrase: {
            "kubernetes.namespace" => {
              query: namespace
            }
          }
        }
      end

      def filter_container_name(query, container_name)
        # A pod can contain multiple containers.
        # By default we return logs from every container
        return if container_name.nil?

        query[:bool][:must] << {
          match_phrase: {
            "kubernetes.container.name" => {
              query: container_name
            }
          }
        }
      end

      def filter_search(query, search)
        return if search.nil?

        query[:bool][:must] << {
          simple_query_string: {
            query: search,
            fields: [:message],
            default_operator: :and
          }
        }
      end

      def filter_times(query, start_time, end_time)
        return unless start_time || end_time

        time_range = { range: { :@timestamp => {} } }.tap do |tr|
          tr[:range][:@timestamp][:gte] = start_time if start_time
          tr[:range][:@timestamp][:lt] = end_time if end_time
        end

        query[:bool][:filter] = [time_range]
      end

      def format_response(response)
        results = response.fetch("hits", {}).fetch("hits", [])
        last_result = results.last
        results = results.map do |hit|
          {
            timestamp: hit["_source"]["@timestamp"],
            message: hit["_source"]["message"]
          }
        end

        # we queried for the N-most recent records but we want them ordered oldest to newest
        {
          logs: results.reverse,
          cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
        }
      end

      # we want to hide the implementation details of the search_after parameter from the frontend
      # behind a single easily transmitted value
      def encode_cursor(obj)
        obj.join(',')
      end

      def decode_cursor(obj)
        cursor = obj.split(',').map(&:to_i)

        unless valid_cursor(cursor)
          raise InvalidCursor, "invalid cursor format"
        end

        cursor
      end

      def valid_cursor(cursor)
        cursor.instance_of?(Array) &&
        cursor.length == 2 &&
        cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
      end
    end
  end
end