class Queuel::Base::Poller
Attributes
args[RW]
continue_looping[RW]
inst_block[RW]
master[RW]
mutex[RW]
options[RW]
pool_tasks[RW]
queue[RW]
tries[RW]
workers[RW]
Public Class Methods
new(queue, param_block, options = {}, workers = 1)
click to toggle source
# File lib/queuel/base/poller.rb, line 10 def initialize(queue, param_block, options = {}, workers = 1) self.workers = workers self.queue = queue self.options = options || {} self.inst_block = param_block self.tries = 0 self.continue_looping = true self.pool_tasks = 0 self.mutex = Mutex.new end
Public Instance Methods
poll()
click to toggle source
# File lib/queuel/base/poller.rb, line 21 def poll register_trappers logger.debug("Beginning Poll...") self.master = master_thread log_action(:joining, :master) { master.join } rescue SignalException => e logger.warn "Caught (#{e}), shutting Poller down" log_action(:killing, :poller, :warn) { shutdown } end
Private Instance Methods
break_if_nil?()
click to toggle source
# File lib/queuel/base/poller.rb, line 135 def break_if_nil? !!options[:break_if_nil] end
Also aliased as: quit_on_empty?
built_options()
click to toggle source
# File lib/queuel/base/poller.rb, line 127 def built_options raise NotImplementedError end
continue_looping?()
click to toggle source
# File lib/queuel/base/poller.rb, line 131 def continue_looping? !!continue_looping end
decrement_pool_task_count()
click to toggle source
# File lib/queuel/base/poller.rb, line 181 def decrement_pool_task_count self.mutex.synchronize{ self.pool_tasks -= 1 } end
increment_pool_task_count()
click to toggle source
# File lib/queuel/base/poller.rb, line 175 def increment_pool_task_count self.mutex.synchronize{ self.pool_tasks += 1 } end
increment_sleep_time()
click to toggle source
# File lib/queuel/base/poller.rb, line 152 def increment_sleep_time 0.1 end
log_action(verb, subject, level = :debug) { || ... }
click to toggle source
# File lib/queuel/base/poller.rb, line 50 def log_action(verb, subject, level = :debug) verb = verb.to_s.upcase.gsub(/\_/, " ") subject = subject.to_s.upcase.gsub(/\_/, " ") logger.public_send level, "#{verb} #{subject} START" yield logger.public_send level, "#{verb} #{subject} COMPLETE" end
master_looper()
click to toggle source
# File lib/queuel/base/poller.rb, line 119 def master_looper loop do break unless continue_looping? process_off_peek sleep sleep_time end end
master_thread()
click to toggle source
# File lib/queuel/base/poller.rb, line 68 def master_thread Thread.new do master_looper end end
peek()
click to toggle source
# File lib/queuel/base/poller.rb, line 78 def peek queue.peek peek_options end
peek_options()
click to toggle source
# File lib/queuel/base/poller.rb, line 74 def peek_options {} end
pool()
click to toggle source
# File lib/queuel/base/poller.rb, line 64 def pool @pool ||= Thread.pool workers end
pool_full?()
click to toggle source
# File lib/queuel/base/poller.rb, line 168 def pool_full? return false unless queue.max_pool_tasks self.mutex.synchronize{ self.pool_tasks > queue.max_pool_tasks } end
pop_new_message()
click to toggle source
# File lib/queuel/base/poller.rb, line 144 def pop_new_message queue.pop built_options end
process_message()
click to toggle source
# File lib/queuel/base/poller.rb, line 109 def process_message register_trappers message = pop_new_message message.delete if self.inst_block.call message rescue => e logger.warn "Received #{e} when processing #{message}" logger.warn "Backtrace:" logger.warn e.backtrace end
process_off_peek()
click to toggle source
# File lib/queuel/base/poller.rb, line 86 def process_off_peek mem_queue_size = queue_size if mem_queue_size > 0 reset_tries mem_queue_size.times do process_on_thread end GC.start else tried quit_looping! if quit_on_empty? end end
process_on_thread()
click to toggle source
# File lib/queuel/base/poller.rb, line 100 def process_on_thread return if pool_full? increment_pool_task_count pool.process do process_message decrement_pool_task_count end end
queue_size()
click to toggle source
# File lib/queuel/base/poller.rb, line 82 def queue_size # TODO optionize the peek options Array(peek).size end
quit_looping!()
click to toggle source
# File lib/queuel/base/poller.rb, line 140 def quit_looping! self.continue_looping = false end
register_trappers()
click to toggle source
# File lib/queuel/base/poller.rb, line 45 def register_trappers trap(:SIGINT) { Thread.new{shutdown}.join } trap(:INT) { Thread.new{shutdown}.join } end
reset_tries()
click to toggle source
# File lib/queuel/base/poller.rb, line 160 def reset_tries self.tries = 0 end
shutdown()
click to toggle source
# File lib/queuel/base/poller.rb, line 58 def shutdown log_action(:shutting_down, :thread_pool) { pool.shutdown } log_action(:killing, :master_thread) { master.kill } log_action(:killing, :loop) { quit_looping! } end
sleep_time()
click to toggle source
# File lib/queuel/base/poller.rb, line 156 def sleep_time tries < 30 && !pool_full? ? ((start_sleep_time + increment_sleep_time) * tries) : 3 end
start_sleep_time()
click to toggle source
# File lib/queuel/base/poller.rb, line 148 def start_sleep_time 0 end
tried()
click to toggle source
# File lib/queuel/base/poller.rb, line 164 def tried self.tries += 1 end