class Fluent::Plugin::ProxysqlQueryLogInput::Watcher
Public Class Methods
new(path, interval, pos_storage, router, tag, log)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 5 def initialize(path, interval, pos_storage, router, tag, log) super(path, interval) @parser = ProxysqlQueryLog::Parser.new @pos_storage = pos_storage @router = router @tag = tag @log = log @attached = false read end
Public Instance Methods
attach(loop)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 73 def attach(loop) @attached = true super end
attached?()
click to toggle source
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 82 def attached? @attached end
convert_time(t)
click to toggle source
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 69 def convert_time(t) Time.at(t/1000/1000).utc.strftime('%Y-%m-%d %H:%M:%S') end
detach()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 78 def detach @attached = false super end
hostname()
click to toggle source
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 86 def hostname @hostname ||= Socket.gethostname end
on_change(previous, current)
click to toggle source
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 22 def on_change(previous, current) if current.nlink == 0 @log.debug("stop watch: #{@path} (deleted)") @pos_storage.delete(@path) detach else read end end
read()
click to toggle source
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 32 def read @io = File.open(@path) seek(@path) while true @pos = @io.pos raw_total_bytes = @io.read(8) return unless raw_total_bytes query = @parser.parse(@io) @router.emit(@tag, query.start_time/1000/1000, record(query)) @pos_storage.put(@path, @io.pos) end ensure @io.close end
record(query)
click to toggle source
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 50 def record(query) { 'thread_id' => query.thread_id, 'username' => query.username, 'schema_name' => query.schema_name, 'client' => query.client, 'HID' => query.hid, 'server' => query.server, 'start_time' => convert_time(query.start_time), 'end_time' => convert_time(query.end_time), 'duration' => query.end_time - query.start_time, 'digest' => query.digest, 'query' => query.query, 'hostname' => hostname, 'filename' => @path, 'pos' => @pos } end
seek(path)
click to toggle source
# File lib/fluent/plugin/in_proxysql_query_log/watcher.rb, line 17 def seek(path) cursor = @pos_storage.get(path) @io.seek(cursor, IO::SEEK_SET) if cursor end