class Fluent::Plugin::SQLQueryInput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 13
def initialize
  super
  require 'mysql2'
  require 'net/ssh/gateway'
  require 'bigdecimal'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 50
def configure(conf)
  super
end
get_connection() click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 227
def get_connection
  begin
    $log.info "Opening ssh tunnel to #{@ssh_gateway}\n"
    @ssh_gate ||= Net::SSH::Gateway.new(@ssh_gateway, @ssh_username, :verbose => :debug, :forward_agent => true)
    @ssh_port = @ssh_gate.open(@host, 3306, @ssh_local_port) || @db_port
    $log.info "connecting to #{@host}\n"
    return  Mysql2::Client.new({
      :host => '127.0.0.1',
      :port => @ssh_port,
      :username => @db_username,
      :password => @db_password,
      :encoding => @encoding,
      :reconnect => true,
      :read_timeout => 600,
      :connect_timeout => 30
    })
  rescue Exception => e
      $log.error "fluent-plugin-sqlquery-ssh: Main Connect ERROR!\n"
      $log.error "MSG: #{e.message}\n TRACE:#{e.backtrace.join("\n")} PORT: #{@ssh_port}\n"
      #sleep @interval
      #retry
  end
  return nil
end
get_exec_result() click to toggle source

get the query results

# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 155
def get_exec_result
  result = Array.new
  stmt = query(@query)
  stmt.each do |row|
    #to be replaced by the cast array
        row['avg'] = row['avg'].to_f.round(2) if row['avg']
        row['daily_total'] = row['daily_total'].to_f if row['daily_total']
        row['total_transactions'] = row['total_transactions'].to_i if row['total_transactions']
        row['cust_of_customer'] = row['cust_of_customer'].to_i if row['cust_of_customer']
        row['shop_count'] = row['shop_count'].to_i if row['shop_count']
        row['items'] = row['items'].to_i if row['items']
        row['register_count'] = row['register_count'].to_i if row['register_count']
        row['customer_shard'] = @current_shard if  @current_shard
    result.push(row)
  end
  return result
end
get_mysql_hostname() click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 253
def get_mysql_hostname
  query("SHOW VARIABLES LIKE 'hostname'").each do |row|
    return row.fetch('Value')
  end
end
get_shards() click to toggle source

get the shards. put the info somewhere?

# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 110
def get_shards
  begin
      @conn ||= get_connection
      @conn.select_db(@shard_db_name) #switch the customerdb
      $log.info "Getting Shard Mapping: [#{@shard_map_query}]"
      @conn.query("SET SESSION group_concat_max_len=        1844674407370")
      shard_mapping = Array.new
      shardlist = @conn.query(@shard_map_query, :cast => false, :cache_rows => false)
      shardlist.each do |row|
        customer_dbs = row['database_name'].split(',')
        cust_host = row['database_host']
        shard = Hash.new(cust_host)
        shard[cust_host] = customer_dbs
        shard_mapping.push(shard)
        $log.info "adding #{customer_dbs}\n"
      end
      @conn.close
      return shard_mapping
  rescue Exception => e
    $log.error "Can't get shard info\n"
    $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n"
    exit!
  end
end
number?() click to toggle source

Returns true if the column is either of type integer, float or decimal.

# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 189
def number?
  type == :integer || type == :float || type == :decimal
end
process_shard_dbs(remotehost, db_names) click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 135
def process_shard_dbs(remotehost, db_names)
    db_names.each do |db|
     $log.info "customer: #{db} shard: #{remotehost}"
     return if @stop
     @mysql ||= get_connection #open conn to shard.
     @current_shard = remotehost
     @mysql.select_db(db) #switch the customerdb
     #$log.info "check customer: #{db}\n"
      tag = "#{@tag}_#{db}_#{remotehost}"
      record = Hash.new
        result = get_exec_result
        #record.store(@row_count_key, result.size) if @row_count
        result.each_with_index do |data, index|
          router.emit(tag, Fluent::Engine.now, record.merge(data))
        end
        $log.info "completed #{db} rows: #{result.size}\n"
    end
end
query(query) click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 173
def query(query)
  begin
    return if @mysql.nil?
    return @mysql.query(query, :cast => false, :cache_rows => false)
  rescue Exception => e
      $log.error "fluent-plugin-sqlquery-ssh: Query ERROR!\n"
      $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n"
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 72
def run
  begin
    $log.info "Begin run."
    loop do
      break if @stop
       @shard_map ||= get_shards
       #loop thru shards
      begin
         @shard_map.each do |shard|
           break if @stop
           shard.each do |remotehost,db_names|
              break if @stop
              @mysql = nil #close connection
              @ssh_gate.close(@ssh_local_port)
              hostname = remotehost.gsub(/mylocaldb([0-9]*)/, "customers\\1-reporting.db.prd.us-east-silo.ls")
              @host = hostname
              $log.info "start shard: #{hostname}\n"
              #handle the dbs
              process_shard_dbs(remotehost, db_names)
              $log.info "finished shard: #{hostname}\n"
           end
         end
         @mysql = nil #close connection
         @ssh_gate.close(@ssh_local_port)
         #process_shards if @shard_map
         $log.info "completed run all shards. sleeping for #{@interval}\n"
        #end don't loop
      rescue Exception => e
        $log.error "error on shard: #{@current_shard}\n"
        $log.error "#{e.message}\n#{e.backtrace.join("\n")}\n"
      end
      sleep @interval
    end
  rescue
      # ignore
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 61
def shutdown
  @stop = "ending run"
  @ssh_gate.close(@ssh_local_port)
  @mysql = nil
  if @thread
    @thread.join
    @thread = nil
  end
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 54
def start
  super
  #thread_create(:, &method(:run))
  @thread = Thread.new(&method(:run))
  @ssh_port  = nil
end
text?() click to toggle source

Returns true if the column is either of type string or text.

# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 184
def text?
  type == :string || type == :text
end
value_to_boolean(value) click to toggle source

convert something to a boolean

# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 194
def value_to_boolean(value)
  if value.is_a?(String) && value.empty?
    nil
  else
    TRUE_VALUES.include?(value)
  end
end
value_to_decimal(value) click to toggle source

convert something to a BigDecimal

# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 214
def value_to_decimal(value)
  # Using .class is faster than .is_a? and
  # subclasses of BigDecimal will be handled
  # in the else clause
  if value.class == BigDecimal
    value
  elsif value.respond_to?(:to_d)
    value.to_d
  else
    value.to_s.to_d
  end
end
value_to_integer(value) click to toggle source

Used to convert values to integer. handle the case when an integer column is used to store boolean values

# File lib/fluent/plugin/in_sqlquery_ssh.rb, line 204
def value_to_integer(value)
  case value
  when TrueClass, FalseClass
    value ? 1 : 0
  else
    value.to_i rescue nil
  end
end