class Fluent::Plugin::PgJsonOutput

Constants

DEFAULT_BUFFER_TYPE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgjson.rb, line 46
def initialize
  super
  @conn = nil
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgjson.rb, line 51
def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super
  unless @chunk_key_tag
    raise Fluent::ConfigError, "'tag' in chunk_keys is required."
  end
  @encoder = case @encoder
             when :yajl
               Yajl
             when :json
               JSON
             end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 81
def format(tag, time, record)
  [Time.at(time).strftime(@time_format), record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 73
def formatted_to_msgpack_binary
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 77
def multi_workers_ready?
  true
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgjson.rb, line 65
def shutdown
  if ! @conn.nil? and ! @conn.finished?
    @conn.close()
  end

  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 85
def write(chunk)
  init_connection
  @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'")
  begin
    tag = chunk.metadata.tag
    chunk.msgpack_each do |time, record|
      @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n"
    end
  rescue => err
    errmsg = "%s while copy data: %s" % [ err.class.name, err.message ]
    @conn.put_copy_end( errmsg )
    @conn.get_result
    @conn.close()
    @conn = nil
    raise
  else
    @conn.put_copy_end
    res = @conn.get_result
    if res.result_status!=PG::PGRES_COMMAND_OK
      @conn.close()
      @conn = nil
    end
    raise res.result_error_message if res.result_status!=PG::PGRES_COMMAND_OK
  end
end

Private Instance Methods

init_connection() click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 113
def init_connection
  if @conn.nil?
    log.debug "connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..."

    begin
      @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password)
    rescue
      if ! @conn.nil?
        @conn.close()
        @conn = nil
      end
      raise "failed to initialize connection: #$!"
    end
  end
end
record_value(record) click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 129
def record_value(record)
  if @msgpack
    "\\#{@conn.escape_bytea(record.to_msgpack)}"
  else
    json = @encoder.dump(record)
    json.gsub!(/\\/){ '\\\\' }
    json
  end
end