class Fluent::Plugin::TimescaleDB

Constants

TIME_FORMAT

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/out_timescaledb.rb, line 18
def close
  @conn.close if @conn and !@conn.finished?
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_timescaledb.rb, line 33
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_timescaledb.rb, line 37
def formatted_to_msgpack_binary
  true
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_timescaledb.rb, line 12
def start
  super

  @conn = PG.connect(@db_conn_string)
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_timescaledb.rb, line 22
def write(chunk)
  reconnect_if_connection_bad!

  values = []
  chunk.msgpack_each do | tag, time, record |
    values << "('#{@conn.escape_string(format_time(time))}','#{@conn.escape_string(tag)}','#{@conn.escape_string(record.to_json)}'::jsonb)"
  end

  @conn.exec("INSERT INTO #{@conn.escape_identifier(@db_table_name)} (time, tag, record) VALUES #{values.join(',')}")
end

Private Instance Methods

format_time(time) click to toggle source
# File lib/fluent/plugin/out_timescaledb.rb, line 45
def format_time(time)
  Time.at(time.to_f).utc.strftime(TIME_FORMAT)
end
reconnect_if_connection_bad!() click to toggle source
# File lib/fluent/plugin/out_timescaledb.rb, line 49
def reconnect_if_connection_bad!
  if @conn.status == PG::CONNECTION_BAD
    @conn.reset
  end
end