class Fluent::MySQLSlowQueryExInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 9 def initialize super require 'mysql-slowquery-parser' end
Public Instance Methods
apply_dbname_to_record(parsed_query)
click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 103 def apply_dbname_to_record(parsed_query) database_name = parsed_query[:db] || parsed_query[:schema] || @last_dbname_of[@path.to_sym] || @dbname_if_missing_dbname_in_log @last_dbname_of[@path.to_sym] = database_name parsed_query[:database] = database_name parsed_query.delete(:db) parsed_query.delete(:schema) parsed_query end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 14 def configure(conf) conf['format'] = 'none' super if conf['pos_file'] == @last_dbname_file raise Fluet::ConfigError, '' end end
get_last_dbname()
click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 39 def get_last_dbname return unless @last_dbname_file_handle @last_dbname_file_handle.pos = 0 last_db = @last_dbname_file_handle.read.chomp begin JSON.parse(last_db, symbolize_names: true) rescue JSON::ParserError {} end end
parser()
click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 60 def parser MySQLSlowQueryParser end
prepare_lines_to_parse(lines, slow_queries = [])
click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 88 def prepare_lines_to_parse(lines, slow_queries = []) @query_unit = [] unless @query_unit while !lines.empty? line = lines.shift @query_unit << line if line.end_with?(';', ";\n") && !line.start_with?('use ', 'SET timestamp=') slow_queries << @query_unit @query_unit = nil prepare_lines_to_parse(lines, slow_queries) break # For when refactoring. Just in case. end end slow_queries end
receive_lines(lines, tail_watcher)
click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 64 def receive_lines(lines, tail_watcher) es = Fluent::MultiEventStream.new prepare_lines_to_parse(lines).each do |query_unit| begin parsed_query_unit = parser.parse_slow_log(query_unit) rescue log.warn %Q{in_mysqlslowquery_ex: parse error: #{$!.message}, (#{query_unit.to_s})} next end parsed_query = apply_dbname_to_record(parsed_query_unit) es.add(Fluent::EventTime.now.to_i, parsed_query) save_last_dbname() end if !es.empty? begin router.emit_stream(@tag, es) rescue # ignore errors. Engine shows logs and backtraces. end end end
save_last_dbname()
click to toggle source
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 50 def save_last_dbname return unless @last_dbname_file_handle current = get_last_dbname() unless current == @last_dbname_of @last_dbname_file_handle.pos = 0 @last_dbname_file_handle.truncate(0) @last_dbname_file_handle.write(JSON.generate(current.merge(@last_dbname_of))) end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 33 def shutdown save_last_dbname() @last_dbname_file_handle.close if @last_dbname_file_handle super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysqlslowquery_ex.rb, line 22 def start @last_dbname_of = if @last_dbname_file @last_dbname_file_handle = File.open(@last_dbname_file, File::RDWR|File::CREAT, @file_perm) @last_dbname_file_handle.sync = true get_last_dbname() else {} end super end