class Evrone::Common::AMQP::Supervisor::Threaded
Constants
- POOL_INTERVAL
- Task
Public Class Methods
build(tasks)
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 38 def build(tasks) supervisor = new tasks.each_pair do |k,v| v.to_i.times do |n| supervisor.add k, :subscribe, n end end supervisor end
new()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 62 def initialize self.class.resume @tasks = Array.new end
resume()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 48 def resume @@shutdown = false end
shutdown()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 56 def shutdown @@shutdown = true end
shutdown?()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 52 def shutdown? @@shutdown end
Public Instance Methods
add(object, method, id)
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 67 def add(object, method, id) @tasks.push Task.new(object, method, id).freeze end
run()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 87 def run start_all_threads loop do task = @tasks.shift break unless task case when shutdown? log_thread_error task when task.alive? @tasks.push task else process_fail task end sleep POOL_INTERVAL unless shutdown? end end
run_async()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 83 def run_async Thread.new { run }.tap{|t| t.abort_on_exception = true } end
shutdown()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 79 def shutdown self.class.shutdown end
shutdown?()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 75 def shutdown? self.class.shutdown? end
size()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 71 def size @tasks.size end
Private Instance Methods
check_attempt(task)
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 164 def check_attempt(task) task.attempt.to_i <= Common::AMQP.config.spawn_attempts.to_i end
create_thread(task, attempt)
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 128 def create_thread(task, attempt) attempt = 0 if reset_attempt?(task) task.dup.tap do |new_task| new_task.thread = Thread.new(new_task) do |t| Thread.current[:evrone_amqp_consumer_id] = t.id t.object.send t.method end new_task.thread.abort_on_exception = false new_task.attempt = attempt new_task.start_at = Time.now new_task.freeze debug "spawn #{new_task.inspect}" end end
log_thread_error(task)
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 143 def log_thread_error(task) return unless task.thread begin task.thread.value nil rescue Exception => e STDERR.puts "#{e.inspect} in #{task.inspect}" STDERR.puts e.backtrace.join("\n") run_on_error_callback(e) e end end
process_fail(task)
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 109 def process_fail(task) log_thread_error task if check_attempt task @tasks.push create_thread(task, task.attempt + 1) else raise SpawnAttemptsLimitReached end end
reset_attempt?(task)
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 157 def reset_attempt?(task) return true unless task.start_at interval = 60 (task.start_at + interval) < Time.now end
start_all_threads()
click to toggle source
# File lib/evrone/common/amqp/supervisor/threaded.rb, line 118 def start_all_threads started_tasks = Array.new while task = @tasks.shift started_tasks.push create_thread(task, 0) end while task = started_tasks.shift @tasks.push task end end