class Queuel::Base::Poller

Attributes

args[RW]
continue_looping[RW]
inst_block[RW]
master[RW]
mutex[RW]
options[RW]
pool_tasks[RW]
queue[RW]
tries[RW]
workers[RW]

Public Class Methods

new(queue, param_block, options = {}, workers = 1) click to toggle source
# File lib/queuel/base/poller.rb, line 10
def initialize(queue, param_block, options = {}, workers = 1)
  self.workers = workers
  self.queue = queue
  self.options = options || {}
  self.inst_block = param_block
  self.tries = 0
  self.continue_looping = true
  self.pool_tasks = 0
  self.mutex = Mutex.new
end

Public Instance Methods

poll() click to toggle source
# File lib/queuel/base/poller.rb, line 21
def poll
  register_trappers
  logger.debug("Beginning Poll...")
  self.master = master_thread
  log_action(:joining, :master) { master.join }
rescue SignalException => e
  logger.warn "Caught (#{e}), shutting Poller down"
  log_action(:killing, :poller, :warn) { shutdown }
end

Private Instance Methods

break_if_nil?() click to toggle source
# File lib/queuel/base/poller.rb, line 135
def break_if_nil?
  !!options[:break_if_nil]
end
Also aliased as: quit_on_empty?
built_options() click to toggle source
# File lib/queuel/base/poller.rb, line 127
def built_options
  raise NotImplementedError
end
continue_looping?() click to toggle source
# File lib/queuel/base/poller.rb, line 131
def continue_looping?
  !!continue_looping
end
decrement_pool_task_count() click to toggle source
# File lib/queuel/base/poller.rb, line 181
def decrement_pool_task_count
  self.mutex.synchronize{
    self.pool_tasks -= 1
  }
end
increment_pool_task_count() click to toggle source
# File lib/queuel/base/poller.rb, line 175
def increment_pool_task_count
  self.mutex.synchronize{
    self.pool_tasks += 1
  }
end
increment_sleep_time() click to toggle source
# File lib/queuel/base/poller.rb, line 152
def increment_sleep_time
  0.1
end
log_action(verb, subject, level = :debug) { || ... } click to toggle source
# File lib/queuel/base/poller.rb, line 50
def log_action(verb, subject, level = :debug)
  verb = verb.to_s.upcase.gsub(/\_/, " ")
  subject = subject.to_s.upcase.gsub(/\_/, " ")
  logger.public_send level, "#{verb} #{subject} START"
  yield
  logger.public_send level, "#{verb} #{subject} COMPLETE"
end
master_looper() click to toggle source
# File lib/queuel/base/poller.rb, line 119
def master_looper
  loop do
    break unless continue_looping?
    process_off_peek
    sleep sleep_time
  end
end
master_thread() click to toggle source
# File lib/queuel/base/poller.rb, line 68
def master_thread
  Thread.new do
    master_looper
  end
end
peek() click to toggle source
# File lib/queuel/base/poller.rb, line 78
def peek
  queue.peek peek_options
end
peek_options() click to toggle source
# File lib/queuel/base/poller.rb, line 74
def peek_options
  {}
end
pool() click to toggle source
# File lib/queuel/base/poller.rb, line 64
def pool
  @pool ||= Thread.pool workers
end
pool_full?() click to toggle source
# File lib/queuel/base/poller.rb, line 168
def pool_full?
  return false unless queue.max_pool_tasks
  self.mutex.synchronize{
    self.pool_tasks > queue.max_pool_tasks
  }
end
pop_new_message() click to toggle source
# File lib/queuel/base/poller.rb, line 144
def pop_new_message
  queue.pop built_options
end
process_message() click to toggle source
# File lib/queuel/base/poller.rb, line 109
def process_message
  register_trappers
  message = pop_new_message
  message.delete if self.inst_block.call message
rescue => e
  logger.warn "Received #{e} when processing #{message}"
  logger.warn "Backtrace:"
  logger.warn e.backtrace
end
process_off_peek() click to toggle source
# File lib/queuel/base/poller.rb, line 86
def process_off_peek
  mem_queue_size = queue_size
  if mem_queue_size > 0
    reset_tries
    mem_queue_size.times do
      process_on_thread
    end
    GC.start
  else
    tried
    quit_looping! if quit_on_empty?
  end
end
process_on_thread() click to toggle source
# File lib/queuel/base/poller.rb, line 100
def process_on_thread
  return if pool_full?
  increment_pool_task_count
  pool.process do
    process_message
    decrement_pool_task_count
  end
end
queue_size() click to toggle source
# File lib/queuel/base/poller.rb, line 82
def queue_size # TODO optionize the peek options
  Array(peek).size
end
quit_looping!() click to toggle source
# File lib/queuel/base/poller.rb, line 140
def quit_looping!
  self.continue_looping = false
end
quit_on_empty?()
Alias for: break_if_nil?
register_trappers() click to toggle source
# File lib/queuel/base/poller.rb, line 45
def register_trappers
  trap(:SIGINT) { Thread.new{shutdown}.join }
  trap(:INT) { Thread.new{shutdown}.join }
end
reset_tries() click to toggle source
# File lib/queuel/base/poller.rb, line 160
def reset_tries
  self.tries = 0
end
shutdown() click to toggle source
# File lib/queuel/base/poller.rb, line 58
def shutdown
  log_action(:shutting_down, :thread_pool) { pool.shutdown }
  log_action(:killing, :master_thread) { master.kill }
  log_action(:killing, :loop) { quit_looping! }
end
sleep_time() click to toggle source
# File lib/queuel/base/poller.rb, line 156
def sleep_time
  tries < 30 && !pool_full? ? ((start_sleep_time + increment_sleep_time) * tries) : 3
end
start_sleep_time() click to toggle source
# File lib/queuel/base/poller.rb, line 148
def start_sleep_time
  0
end
tried() click to toggle source
# File lib/queuel/base/poller.rb, line 164
def tried
  self.tries += 1
end