class RedshiftConnection
Attributes
db_conf[R]
Public Class Methods
new(db_conf)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 366 def initialize(db_conf) @db_conf = db_conf @connection = nil end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 391 def close @connection.close rescue nil if @connection @connection = nil end
connect_start()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 387 def connect_start @connection = create_redshift_connection end
exec(sql, &block)
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 373 def exec(sql, &block) conn = @connection conn = create_redshift_connection if conn.nil? if block conn.exec(sql) {|result| block.call(result)} else conn.exec(sql) end rescue PG::Error => e raise RedshiftError.new(e) ensure conn.close if conn && @connection.nil? end
Private Instance Methods
create_redshift_connection()
click to toggle source
# File lib/fluent/plugin/out_redshift_v2.rb, line 398 def create_redshift_connection conn = PG::Connection.connect_start(db_conf) raise RedshiftError.new("Unable to create a new connection.") unless conn raise RedshiftError.new("Connection failed: %s" % [ conn.error_message ]) if conn.status == PG::CONNECTION_BAD socket = conn.socket_io poll_status = PG::PGRES_POLLING_WRITING until poll_status == PG::PGRES_POLLING_OK || poll_status == PG::PGRES_POLLING_FAILED case poll_status when PG::PGRES_POLLING_READING io = IO.select([socket], nil, nil, db_conf[:connect_timeout]) raise RedshiftError.new("Asynchronous connection timed out!(READING)") unless io when PG::PGRES_POLLING_WRITING io = IO.select(nil, [socket], nil, db_conf[:connect_timeout]) raise RedshiftError.new("Asynchronous connection timed out!(WRITING)") unless io end poll_status = conn.connect_poll end unless conn.status == PG::CONNECTION_OK raise RedshiftError, ("Connect failed: %s" % [conn.error_message.to_s.lines.uniq.join(" ")]) end conn rescue => e conn.close rescue nil if conn raise RedshiftError.new(e) if e.kind_of?(PG::Error) raise e end