class Fluent::PrestoQueryInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_presto_query.rb, line 20
def configure(conf)
  require 'presto-client'
  require 'parse-cron'
  super
end
emit_presto_query() click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 50
def emit_presto_query
  begin
    log.info "sql [#{@sql}]"
    records = exec_query(@sql)
    records.each do |record|
      router.emit @tag, Fluent::Engine.now, record
    end
  rescue => e
    log.error e
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 41
def run
  loop do
    secs = @cron_parser.next(Time.now) - Time.now
    log.info "next query at #{@cron_parser.next(Time.now)}. Sleep #{secs}seconds."
    sleep secs
    Thread.new(&method(:emit_presto_query))
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 37
def shutdown
  Thread.kill(@thread)
end
start() click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 26
def start
  @cron_parser = CronParser.new(@cron)
  @client = Presto::Client.new(
    server: "#{@host}:#{@port}",
    catalog: @catalog,
    user: @user,
    schema: @schema
  )
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

exec_query(query) click to toggle source
# File lib/fluent/plugin/in_presto_query.rb, line 63
def exec_query(query)
  return if query.nil? || query.empty?
  @client.query(query) do |q|
    columns = q.columns.map {|column|
      column.name
    }

    results = []
    q.each_row {|row|
      results << columns.each_with_index.inject({}) {|result, (column_name, i)| 
        result[column_name] = row[i]
        result
      }
    }
    return results
  end
end