blob: 9426f727072ed533b43909cc97c55bda724f33f4 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# frozen_string_literal: true
module ClickHouse
class SyncCursor
QUERY = <<~SQL
SELECT argMax(primary_key_value, recorded_at) AS primary_key_value
FROM sync_cursors
WHERE table_name = {table_name:String}
LIMIT 1
SQL
INSERT_CURSOR_QUERY = <<~SQL
INSERT INTO sync_cursors
(primary_key_value, table_name, recorded_at)
VALUES ({primary_key_value:UInt64}, {table_name:String}, {recorded_at:DateTime64})
SQL
def self.cursor_for(identifier)
query = ClickHouse::Client::Query.new(
raw_query: QUERY,
placeholders: { table_name: identifier.to_s }
)
# The query returns the default value (0) when no records are present.
ClickHouse::Client.select(query, :main).first['primary_key_value']
end
def self.update_cursor_for(identifier, value)
query = ClickHouse::Client::Query.new(
raw_query: INSERT_CURSOR_QUERY,
placeholders: {
primary_key_value: value,
table_name: identifier.to_s,
recorded_at: Time.current.to_f
}
)
ClickHouse::Client.execute(query, :main)
end
end
end
|