class Fluent::Plugin::MysqlReplicatorInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysql_replicator.rb, line 24
def configure(conf)
  super
  @interval = Fluent::Config.time_value(@interval)

  if @tag.nil?
    raise Fluent::ConfigError, "mysql_replicator: missing 'tag' parameter. Please add following line into config like 'tag replicator.mydatabase.mytable.${event}.${primary_key}'"
  end

  log.info "adding mysql_replicator worker. :tag=>#{tag} :query=>#{@query} :prepared_query=>#{@prepared_query} :interval=>#{@interval}sec :enable_delete=>#{enable_delete}"
end
emit_record(tag, record) click to toggle source
# File lib/fluent/plugin/in_mysql_replicator.rb, line 133
def emit_record(tag, record)
  router.emit(tag, Fluent::Engine.now, record)
end
format_tag(tag, param) click to toggle source
# File lib/fluent/plugin/in_mysql_replicator.rb, line 125
def format_tag(tag, param)
  pattern = {'${event}' => param[:event].to_s, '${primary_key}' => @primary_key}
  tag.gsub(/(\${[a-z_]+})/) do
    log.warn "mysql_replicator: missing placeholder. :tag=>#{tag} :placeholder=>#{$1}" unless pattern.include?($1)
    pattern[$1]
  end
end
get_connection() click to toggle source
# File lib/fluent/plugin/in_mysql_replicator.rb, line 149
def get_connection
  begin
    return Mysql2::Client.new({
      :host => @host,
      :port => @port,
      :username => @username,
      :password => @password,
      :database => @database,
      :encoding => @encoding,
      :reconnect => true,
      :stream => true,
      :cache_rows => false
    })
  rescue Exception => e
    log.warn "mysql_replicator: #{e}"
    sleep @interval
    retry
  end
end
hash_delete_by_list(hash, deleted_keys) click to toggle source
# File lib/fluent/plugin/in_mysql_replicator.rb, line 121
def hash_delete_by_list (hash, deleted_keys)
  deleted_keys.each{|k| hash.delete(k)}
end
poll() click to toggle source
# File lib/fluent/plugin/in_mysql_replicator.rb, line 54
def poll
  table_hash = Hash.new
  ids = Array.new
  con = get_connection()
  prepared_con = get_connection()
  loop do
    rows_count = 0
    start_time = Time.now
    previous_ids = ids
    current_ids = Array.new
    if !@prepared_query.nil?
      @prepared_query.split(/;/).each do |query|
        prepared_con.query(query)
      end
    end
    rows, con = query(@query, con)
    rows.each do |row|
      current_ids << row[@primary_key]
      current_hash = Digest::SHA1.hexdigest(row.flatten.join)
      row.each {|k, v| row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
      row.select {|k, v| v.to_s.strip.match(/^SELECT(\s+)/i) }.each do |k, v|
        row[k] = [] unless row[k].is_a?(Array)
        nest_rows, prepared_con = query(v.gsub(/\$\{([^\}]+)\}/, row[$1].to_s), prepared_con)
        nest_rows.each do |nest_row|
          nest_row.each {|k, v| nest_row[k] = v.to_s if v.is_a?(Time) || v.is_a?(Date) || v.is_a?(BigDecimal)}
          row[k] << nest_row
        end
        prepared_con.close
      end
      if row[@primary_key].nil?
        log.error "mysql_replicator: missing primary_key. :tag=>#{tag} :primary_key=>#{primary_key}"
        break
      end
      if !table_hash.include?(row[@primary_key])
        tag = format_tag(@tag, {:event => :insert})
        emit_record(tag, row)
      elsif table_hash[row[@primary_key]] != current_hash
        tag = format_tag(@tag, {:event => :update})
        emit_record(tag, row)
      end
      table_hash[row[@primary_key]] = current_hash
      rows_count += 1
    end
    con.close
    ids = current_ids
    if @enable_delete
      if current_ids.empty?
        deleted_ids = Array.new
      elsif previous_ids.empty?
        deleted_ids = [*1...current_ids.max] - current_ids
      else
        deleted_ids = previous_ids - current_ids
      end
      if deleted_ids.count > 0
        hash_delete_by_list(table_hash, deleted_ids)
        deleted_ids.each do |id|
          tag = format_tag(@tag, {:event => :delete})
          emit_record(tag, {@primary_key => id})
        end
      end
    end
    elapsed_time = sprintf("%0.02f", Time.now - start_time)
    log.info "mysql_replicator: finished execution :tag=>#{tag} :rows_count=>#{rows_count} :elapsed_time=>#{elapsed_time} sec"
    sleep @interval
  end
end
query(query, con = nil) click to toggle source
# File lib/fluent/plugin/in_mysql_replicator.rb, line 137
def query(query, con = nil)
  begin
    con = con.nil? ? get_connection : con
    con = con.ping ? con : get_connection
    return con.query(query), con
  rescue Exception => e
    log.warn "mysql_replicator: #{e}"
    sleep @interval
    retry
  end
end
run() click to toggle source
# File lib/fluent/plugin/in_mysql_replicator.rb, line 44
def run
  begin
    poll
  rescue StandardError => e
    log.error "mysql_replicator: failed to execute query."
    log.error "error: #{e.message}"
    log.error e.backtrace.join("\n")
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysql_replicator.rb, line 40
def shutdown
 super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mysql_replicator.rb, line 35
def start
  super
  thread_create(:in_mysql_replicator_runner, &method(:run))
end