class Fluent::PgdistOutput

Constants

DB_ESCAPE_PATTERN

Attributes

handler[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgdist.rb, line 134
def initialize
  super
  require 'pg'
end

Public Instance Methods

client() click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 32
def client
  PG::Connection.new({
    :host => @host, :port => @port,
    :user => @username, :password => @password,
    :dbname => @database
  })
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgdist.rb, line 40
def configure(conf)
  super

  @insert_columns ||= @columns
  @insert_values ||= @values

  if @insert_columns.nil?
    raise fluent::configerror, "columns must be specified, but missing"
  end

  @table_moniker_lambda = eval("lambda#{@table_moniker}")
  @insert_filter_lambda = eval("lambda#{@insert_filter}")
  if @file_moniker
    @file_moniker_lambda = eval("lambda#{@file_moniker}")
    @sequence_moniker ||= '{|table|"/tmp/#{table}.seq"}'
    case @file_format
    when "json" || "msgpack" || "message_pack"
      @file_record_filter ||= '{|record|record}'
    when "ltsv"
      @file_record_filter ||= '{|fields,record|[fields,record]}'
    else
    end
  end
  @file_record_filter_lambda = eval("lambda#{@file_record_filter}") if @file_record_filter
  @sequence_moniker_lambda = eval("lambda#{@sequence_moniker}") if @sequence_moniker
  self
end
db_escape(str) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 70
def db_escape(str)
  return "\\N" if str.nil?
  rest = str
  ret = ''
  while match_data = DB_ESCAPE_PATTERN.match(rest)
    ret += match_data.pre_match
    code = match_data[0]
    rest = match_data.post_match
    case code
    when '\\'
      ret += '\\\\'
    when "\a"
      ret += "\\a"
    when "\b"
      ret += "\\b"
    when "\n"
      ret += "\\n"
    when "\r"
      ret += "\\r"
    when "\t"
      ret += "\\t"
    end
  end
  return ret + rest
end
delete_duplicative_records(records) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 96
def delete_duplicative_records(records)
  records.uniq!{|r|r[@unique_column]}
end
delete_existing_records(handler, table, records) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 100
def delete_existing_records(handler, table, records)
  unique_values = records.map{|r|r[@unique_column]}
  if unique_values != []
    where_sql = "where " + 1.upto(unique_values.size).map{|i|"#{@unique_column} = \$#{i}"}.join(" or ")
    handler.prepare("select_#{table}", "select #{@unique_column} from #{table} #{where_sql}")
    result = handler.exec_prepared("select_#{table}", unique_values)
    exist_values = result.column_values(0)
    return if exist_values.size == 0
    $log.info "delete #{exist_values.size} duplicative records for #{table}"
    records.reject!{|r|exist_values.include?(r[@unique_column])}
  end
end
file_path(table) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 113
def file_path(table)
  @file_moniker_lambda.call(table)
end
filter_for_file_record(*args) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 117
def filter_for_file_record(*args)
  result = @file_record_filter_lambda.call(*args)
  return result
end
filter_for_insert(record) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 122
def filter_for_insert(record)
  @insert_filter_lambda.call(record)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 130
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
insert_into_db(handler, table, records) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 139
def insert_into_db(handler, table, records)
  if @unique_column
    delete_duplicative_records(records)
    delete_existing_records(handler, table, records)
  end

  $log.info "insert #{records.size} records into #{table}"
  sql = "INSERT INTO #{table}(#{@insert_columns}) VALUES(#{@insert_values})"
  $log.info "execute sql #{sql.inspect}"
  statement = "write_#{table}"
  handler.prepare(statement, sql)
  records.each do |record|
    record = filter_for_insert(record)
    $log.info "insert #{record.inspect}"
    begin
      handler.exec_prepared(statement, record)
    rescue Exception=>e
      if @raise_exception
        raise e
      else
        $log.info e.message
      end
    end
  end
end
read_last_sequence(filepath) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 165
def read_last_sequence(filepath)
  last_sequence =  File.read(filepath).chomp
  last_sequence = nil if /-?[0-9]+/ !~ last_sequence
  return last_sequence
end
read_last_sequence_from_file(handler, table, filepath) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 171
def read_last_sequence_from_file(handler, table, filepath)
  last_line = `tail -n 1 #{filepath}`
  case @file_format
  when "json"
    last_record = JSON.parse(last_line)
    last_sequence = last_record[@sequence_column]
  when "ltsv"
    last_record = Hash[last_line.split(/\t/).map{|p|p.split(/:/, 2)}]
    last_sequence = last_record[@sequence_column]
  else
    result = handler.exec("select * from #{table} limit 0")
    fields = result.fields
    sequence_index = fields.index(@sequence_column)
    last_record = last_line.split(/\t/)
    last_sequence = last_record[sequence_index]
  end
  last_sequence = nil if /-?[0-9]+/ !~ last_sequence
  return last_sequence
end
sequence_path(table) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 126
def sequence_path(table)
  @sequence_moniker_lambda.call(table)
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgdist.rb, line 191
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgdist.rb, line 195
def start
  super
end
table_name(record) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 199
def table_name(record)
  @table_moniker_lambda.call(record)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 203
def write(chunk)
  handler = self.client
  records_hash = {}
  chunk.msgpack_each { |tag, time, data|
    table = @table_moniker_lambda.call(data)
    if ! table.nil?
      records_hash[table] ||= []
      records_hash[table].push data
    end
  }
  records_hash.each_pair do |table, records|
    insert_into_db(handler, table, records)
    write_to_file(handler, table) if @file_moniker
  end
  handler.close
end
write_pg_result(output_stream, fields, pg_result) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 220
def write_pg_result(output_stream, fields, pg_result)
  case @file_format
  when "json"
    pg_result.each do |tuple|
      tuple = filter_for_file_record(tuple)
      output_stream.puts(tuple.to_json)
    end
  when "ltsv"
    pg_result.each_row do |row|
      fields, row = filter_for_file_record(fields, row)
      output_stream.puts(fields.each_with_index.map{|f,i|"#{f}:#{db_escape(row[i])}"}.join("\t"))
    end
  when "msgpack" || "message_pack"
    pg_result.each do |tuple|
      tuple = filter_for_file_record(tuple)
      output_stream.write(tuple.to_msgpack)
    end
  else
    pg_result.each_row do |row|
      output_stream.puts(row.map{|v|db_escape(v)}.join("\t"))
    end
  end
end
write_to_file(handler, table) click to toggle source
# File lib/fluent/plugin/out_pgdist.rb, line 244
def write_to_file(handler, table)
  sequence_file_path = sequence_path(table)
  last_sequence = read_last_sequence(sequence_file_path) if File.exists?(sequence_file_path)
  file = nil
  while true
    where_sql = "where #{last_sequence} < #{@sequence_column}" if last_sequence
    result = handler.exec("select * from #{table} #{where_sql} order by #{@sequence_column} limit #{@file_write_limit}")
    result_size = result.ntuples
    break if result_size == 0

    fields ||= result.fields
    file ||= File.open(file_path(table), "a")
    write_pg_result(file, fields, result)
    last_sequence = result[result_size-1][@sequence_column]
    break if result_size < @file_write_limit
  end
  if file
    file.close
    File.write(sequence_file_path, last_sequence.to_s) if last_sequence
  end
end