class Fluent::Plugin::PgJsonOutput
Constants
- DEFAULT_BUFFER_TYPE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgjson.rb, line 46 def initialize super @conn = nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgjson.rb, line 51 def configure(conf) compat_parameters_convert(conf, :buffer) super unless @chunk_key_tag raise Fluent::ConfigError, "'tag' in chunk_keys is required." end @encoder = case @encoder when :yajl Yajl when :json JSON end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 81 def format(tag, time, record) [Time.at(time).strftime(@time_format), record].to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 73 def formatted_to_msgpack_binary true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 77 def multi_workers_ready? true end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_pgjson.rb, line 65 def shutdown if ! @conn.nil? and ! @conn.finished? @conn.close() end super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 85 def write(chunk) init_connection @conn.exec("COPY #{@table} (#{@tag_col}, #{@time_col}, #{@record_col}) FROM STDIN WITH DELIMITER E'\\x01'") begin tag = chunk.metadata.tag chunk.msgpack_each do |time, record| @conn.put_copy_data "#{tag}\x01#{time}\x01#{record_value(record)}\n" end rescue => err errmsg = "%s while copy data: %s" % [ err.class.name, err.message ] @conn.put_copy_end( errmsg ) @conn.get_result @conn.close() @conn = nil raise else @conn.put_copy_end res = @conn.get_result if res.result_status!=PG::PGRES_COMMAND_OK @conn.close() @conn = nil end raise res.result_error_message if res.result_status!=PG::PGRES_COMMAND_OK end end
Private Instance Methods
init_connection()
click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 113 def init_connection if @conn.nil? log.debug "connecting to PostgreSQL server #{@host}:#{@port}, database #{@database}..." begin @conn = PG::Connection.new(dbname: @database, host: @host, port: @port, sslmode: @sslmode, user: @user, password: @password) rescue if ! @conn.nil? @conn.close() @conn = nil end raise "failed to initialize connection: #$!" end end end
record_value(record)
click to toggle source
# File lib/fluent/plugin/out_pgjson.rb, line 129 def record_value(record) if @msgpack "\\#{@conn.escape_bytea(record.to_msgpack)}" else json = @encoder.dump(record) json.gsub!(/\\/){ '\\\\' } json end end