class Fluent::MySQLSlowQueryExInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 9
def initialize
  super
  require 'mysql-slowquery-parser'
end

Public Instance Methods

apply_dbname_to_record(parsed_query) click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 103
def apply_dbname_to_record(parsed_query)
  database_name = parsed_query[:db] || parsed_query[:schema] || @last_dbname_of[@path.to_sym] || @dbname_if_missing_dbname_in_log
  @last_dbname_of[@path.to_sym] = database_name
  parsed_query[:database] = database_name
  parsed_query.delete(:db)
  parsed_query.delete(:schema)
  parsed_query
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 14
def configure(conf)
  conf['format'] = 'none'
  super
  if conf['pos_file'] == @last_dbname_file
    raise Fluet::ConfigError, ''
  end
end
get_last_dbname() click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 39
def get_last_dbname
  return unless @last_dbname_file_handle
  @last_dbname_file_handle.pos = 0
  last_db = @last_dbname_file_handle.read.chomp
  begin
    JSON.parse(last_db, symbolize_names: true)
  rescue JSON::ParserError
    {}
  end
end
parser() click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 60
def parser
  MySQLSlowQueryParser
end
prepare_lines_to_parse(lines, slow_queries = []) click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 88
def prepare_lines_to_parse(lines, slow_queries = [])
  @query_unit = [] unless @query_unit
  while !lines.empty?
    line = lines.shift
    @query_unit << line
    if line.end_with?(';', ";\n") && !line.start_with?('use ', 'SET timestamp=')
      slow_queries << @query_unit
      @query_unit = nil
      prepare_lines_to_parse(lines, slow_queries)
      break # For when refactoring. Just in case.
    end
  end
  slow_queries
end
receive_lines(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 64
def receive_lines(lines, tail_watcher)
  es = Fluent::MultiEventStream.new

  prepare_lines_to_parse(lines).each do |query_unit|
    begin
      parsed_query_unit = parser.parse_slow_log(query_unit)
    rescue
      log.warn %Q{in_mysqlslowquery_ex: parse error: #{$!.message}, (#{query_unit.to_s})}
      next
    end
    parsed_query = apply_dbname_to_record(parsed_query_unit)
    es.add(Fluent::EventTime.now.to_i, parsed_query)
    save_last_dbname()
  end

  if !es.empty?
    begin
      router.emit_stream(@tag, es)
    rescue
      # ignore errors. Engine shows logs and backtraces.
    end
  end
end
save_last_dbname() click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 50
def save_last_dbname
  return unless @last_dbname_file_handle
  current = get_last_dbname()
  unless current == @last_dbname_of
    @last_dbname_file_handle.pos = 0
    @last_dbname_file_handle.truncate(0)
    @last_dbname_file_handle.write(JSON.generate(current.merge(@last_dbname_of)))
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 33
def shutdown
  save_last_dbname()
  @last_dbname_file_handle.close if @last_dbname_file_handle
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 22
def start
  @last_dbname_of = if @last_dbname_file
                      @last_dbname_file_handle = File.open(@last_dbname_file, File::RDWR|File::CREAT, @file_perm)
                      @last_dbname_file_handle.sync = true
                      get_last_dbname()
                    else
                      {}
                    end
  super
end