module Activerecord::Oracle::Queue::Watcher::ClassMethods
Public Instance Methods
watch()
click to toggle source
# File lib/activerecord/oracle/queue/watcher.rb, line 30 def watch Rails.logger = Logger.new(STDOUT) connection = fetch_connection cursor = fetch_cursor(connection) cursor.bind_param(":p", nil, String, 4000) Rails.logger.info(log_message("Starting watching queue #{self::QueueName}")) while true cursor.exec() # retrieve message begin Rails.logger.info(log_message("Retrieve message with #{cursor[":p"]}")) json = ActiveSupport::JSON.decode(cursor[":p"]) instance = self.new Rails.logger.info(log_message("Perform with #{cursor[":p"]}")) instance.perform(json) rescue JSON::ParserError => exception Rails.logger.error(log_message(exception)) end connection.commit # remove from AQ. dequeue isn't complete until this happens Rails.logger.info(log_message("Perform is done... Dequeueing...")) end end
Private Instance Methods
db_config()
click to toggle source
# File lib/activerecord/oracle/queue/watcher.rb, line 79 def db_config ActiveRecord::Base.connection_config end
fetch_connection()
click to toggle source
# File lib/activerecord/oracle/queue/watcher.rb, line 71 def fetch_connection OCI8.new( db_config[:username], db_config[:password], db_config[:database], ) end
fetch_cursor(connection)
click to toggle source
# File lib/activerecord/oracle/queue/watcher.rb, line 65 def fetch_cursor(connection) connection.parse( "BEGIN #{self::QueueName}_queue.dequeue_message(:p); END;" ) end
log_message(message)
click to toggle source
# File lib/activerecord/oracle/queue/watcher.rb, line 61 def log_message(message) "#{self}: #{message}" end