class RedshiftConnection

Attributes

db_conf[R]

Public Class Methods

new(db_conf) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 366
def initialize(db_conf)
  @db_conf = db_conf
  @connection = nil
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 391
def close
  @connection.close rescue nil if @connection
  @connection = nil
end
connect_start() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 387
def connect_start
  @connection = create_redshift_connection
end
exec(sql, &block) click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 373
def exec(sql, &block)
  conn = @connection
  conn = create_redshift_connection if conn.nil?
  if block
    conn.exec(sql) {|result| block.call(result)}
  else
    conn.exec(sql)
  end
rescue PG::Error => e
  raise RedshiftError.new(e)
ensure
  conn.close if conn && @connection.nil?
end

Private Instance Methods

create_redshift_connection() click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 398
def create_redshift_connection
  conn = PG::Connection.connect_start(db_conf)
  raise RedshiftError.new("Unable to create a new connection.") unless conn
  raise RedshiftError.new("Connection failed: %s" % [ conn.error_message ]) if conn.status == PG::CONNECTION_BAD

  socket = conn.socket_io
  poll_status = PG::PGRES_POLLING_WRITING
  until poll_status == PG::PGRES_POLLING_OK || poll_status == PG::PGRES_POLLING_FAILED
    case poll_status
    when PG::PGRES_POLLING_READING
      io = IO.select([socket], nil, nil, db_conf[:connect_timeout])
      raise RedshiftError.new("Asynchronous connection timed out!(READING)") unless io
    when PG::PGRES_POLLING_WRITING
      io = IO.select(nil, [socket], nil, db_conf[:connect_timeout])
      raise RedshiftError.new("Asynchronous connection timed out!(WRITING)") unless io
    end
    poll_status = conn.connect_poll
  end

  unless conn.status == PG::CONNECTION_OK
    raise RedshiftError, ("Connect failed: %s" % [conn.error_message.to_s.lines.uniq.join(" ")])
  end

  conn
rescue => e
  conn.close rescue nil if conn
  raise RedshiftError.new(e) if e.kind_of?(PG::Error)
  raise e
end