class Fluent::Plugin::SQLQueryInput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 13 def initialize super require 'mysql2' require 'net/ssh/gateway' require 'bigdecimal' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 50 def configure(conf) super end
get_connection()
click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 227 def get_connection begin $log.info "Opening ssh tunnel to #{@ssh_gateway}\n" @ssh_gate ||= Net::SSH::Gateway.new(@ssh_gateway, @ssh_username, :verbose => :debug, :forward_agent => true) @ssh_port = @ssh_gate.open(@host, 3306, @ssh_local_port) || @db_port $log.info "connecting to #{@host}\n" return Mysql2::Client.new({ :host => '127.0.0.1', :port => @ssh_port, :username => @db_username, :password => @db_password, :encoding => @encoding, :reconnect => true, :read_timeout => 600, :connect_timeout => 30 }) rescue Exception => e $log.error "fluent-plugin-sqlquery-ssh: Main Connect ERROR!\n" $log.error "MSG: #{e.message}\n TRACE:#{e.backtrace.join("\n")} PORT: #{@ssh_port}\n" #sleep @interval #retry end return nil end
get_exec_result()
click to toggle source
get the query results
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 155 def get_exec_result result = Array.new stmt = query(@query) stmt.each do |row| #to be replaced by the cast array row['avg'] = row['avg'].to_f.round(2) if row['avg'] row['daily_total'] = row['daily_total'].to_f if row['daily_total'] row['total_transactions'] = row['total_transactions'].to_i if row['total_transactions'] row['cust_of_customer'] = row['cust_of_customer'].to_i if row['cust_of_customer'] row['shop_count'] = row['shop_count'].to_i if row['shop_count'] row['items'] = row['items'].to_i if row['items'] row['register_count'] = row['register_count'].to_i if row['register_count'] row['customer_shard'] = @current_shard if @current_shard result.push(row) end return result end
get_mysql_hostname()
click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 253 def get_mysql_hostname query("SHOW VARIABLES LIKE 'hostname'").each do |row| return row.fetch('Value') end end
get_shards()
click to toggle source
get the shards. put the info somewhere?
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 110 def get_shards begin @conn ||= get_connection @conn.select_db(@shard_db_name) #switch the customerdb $log.info "Getting Shard Mapping: [#{@shard_map_query}]" @conn.query("SET SESSION group_concat_max_len= 1844674407370") shard_mapping = Array.new shardlist = @conn.query(@shard_map_query, :cast => false, :cache_rows => false) shardlist.each do |row| customer_dbs = row['database_name'].split(',') cust_host = row['database_host'] shard = Hash.new(cust_host) shard[cust_host] = customer_dbs shard_mapping.push(shard) $log.info "adding #{customer_dbs}\n" end @conn.close return shard_mapping rescue Exception => e $log.error "Can't get shard info\n" $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n" exit! end end
number?()
click to toggle source
Returns true
if the column is either of type integer, float or decimal.
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 189 def number? type == :integer || type == :float || type == :decimal end
process_shard_dbs(remotehost, db_names)
click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 135 def process_shard_dbs(remotehost, db_names) db_names.each do |db| $log.info "customer: #{db} shard: #{remotehost}" return if @stop @mysql ||= get_connection #open conn to shard. @current_shard = remotehost @mysql.select_db(db) #switch the customerdb #$log.info "check customer: #{db}\n" tag = "#{@tag}_#{db}_#{remotehost}" record = Hash.new result = get_exec_result #record.store(@row_count_key, result.size) if @row_count result.each_with_index do |data, index| router.emit(tag, Fluent::Engine.now, record.merge(data)) end $log.info "completed #{db} rows: #{result.size}\n" end end
query(query)
click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 173 def query(query) begin return if @mysql.nil? return @mysql.query(query, :cast => false, :cache_rows => false) rescue Exception => e $log.error "fluent-plugin-sqlquery-ssh: Query ERROR!\n" $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n" end end
run()
click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 72 def run begin $log.info "Begin run." loop do break if @stop @shard_map ||= get_shards #loop thru shards begin @shard_map.each do |shard| break if @stop shard.each do |remotehost,db_names| break if @stop @mysql = nil #close connection @ssh_gate.close(@ssh_local_port) hostname = remotehost.gsub(/mylocaldb([0-9]*)/, "customers\\1-reporting.db.prd.us-east-silo.ls") @host = hostname $log.info "start shard: #{hostname}\n" #handle the dbs process_shard_dbs(remotehost, db_names) $log.info "finished shard: #{hostname}\n" end end @mysql = nil #close connection @ssh_gate.close(@ssh_local_port) #process_shards if @shard_map $log.info "completed run all shards. sleeping for #{@interval}\n" #end don't loop rescue Exception => e $log.error "error on shard: #{@current_shard}\n" $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n" end sleep @interval end rescue # ignore end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 61 def shutdown @stop = "ending run" @ssh_gate.close(@ssh_local_port) @mysql = nil if @thread @thread.join @thread = nil end super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 54 def start super #thread_create(:, &method(:run)) @thread = Thread.new(&method(:run)) @ssh_port = nil end
text?()
click to toggle source
Returns true
if the column is either of type string or text.
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 184 def text? type == :string || type == :text end
value_to_boolean(value)
click to toggle source
convert something to a boolean
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 194 def value_to_boolean(value) if value.is_a?(String) && value.empty? nil else TRUE_VALUES.include?(value) end end
value_to_decimal(value)
click to toggle source
convert something to a BigDecimal
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 214 def value_to_decimal(value) # Using .class is faster than .is_a? and # subclasses of BigDecimal will be handled # in the else clause if value.class == BigDecimal value elsif value.respond_to?(:to_d) value.to_d else value.to_s.to_d end end
value_to_integer(value)
click to toggle source
Used to convert values to integer. handle the case when an integer column is used to store boolean values
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 204 def value_to_integer(value) case value when TrueClass, FalseClass value ? 1 : 0 else value.to_i rescue nil end end