class Fluent::PrestoQueryInput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_presto_query.rb, line 20 def configure(conf) require 'presto-client' require 'parse-cron' super end
emit_presto_query()
click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 50 def emit_presto_query begin log.info "sql [#{@sql}]" records = exec_query(@sql) records.each do |record| router.emit @tag, Fluent::Engine.now, record end rescue => e log.error e end end
run()
click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 41 def run loop do secs = @cron_parser.next(Time.now) - Time.now log.info "next query at #{@cron_parser.next(Time.now)}. Sleep #{secs}seconds." sleep secs Thread.new(&method(:emit_presto_query)) end end
shutdown()
click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 37 def shutdown Thread.kill(@thread) end
start()
click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 26 def start @cron_parser = CronParser.new(@cron) @client = Presto::Client.new( server: "#{@host}:#{@port}", catalog: @catalog, user: @user, schema: @schema ) @thread = Thread.new(&method(:run)) end
Private Instance Methods
exec_query(query)
click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 63 def exec_query(query) return if query.nil? || query.empty? @client.query(query) do |q| columns = q.columns.map {|column| column.name } results = [] q.each_row {|row| results << columns.each_with_index.inject({}) {|result, (column_name, i)| result[column_name] = row[i] result } } return results end end