class Workers::TaskGroup
Attributes
state[R]
tasks[R]
Public Class Methods
new(options = {})
click to toggle source
# File lib/workers/task_group.rb, line 8 def initialize(options = {}) @logger = Workers::LogProxy.new(options[:logger]) @pool = options[:pool] || Workers.pool @state = :initialized @tasks = [] @internal_lock = Mutex.new @external_lock = Mutex.new @finished_count = 0 @conditional = ConditionVariable.new nil end
Public Instance Methods
add(options = {}, &block)
click to toggle source
# File lib/workers/task_group.rb, line 21 def add(options = {}, &block) state!(:initialized) options[:on_finished] = method(:finished) options[:on_perform] ||= block @tasks << Workers::Task.new(options) nil end
failures()
click to toggle source
# File lib/workers/task_group.rb, line 61 def failures @tasks.select { |t| t.failed? } end
map(inputs, options = {}, &block)
click to toggle source
# File lib/workers/task_group.rb, line 65 def map(inputs, options = {}, &block) inputs.each do |input| add(:input => input, :max_tries => options[:max_tries]) do |i| block.call(i) end end run if (failure = failures[0]) a = failure.input.inspect c = failure.exception.class.to_s m = failure.exception.message b = failure.exception.backtrace.join("\n") raise Workers::FailedTaskError, "#{failures.count} task(s) failed (Only the first failure is shown).\nARGS=#{a}, EXCEPTION=#{c}: #{m}\n#{b}\n----------\n" end tasks.map { |t| t.result } end
run()
click to toggle source
# File lib/workers/task_group.rb, line 32 def run state!(:initialized) @state = :running @run_thread = Thread.current return [] if @tasks.empty? @internal_lock.synchronize do @tasks.each do |task| @pool.perform { task.run } end loop do @conditional.wait(@internal_lock) # The wait can return even if nothing called @conditional.signal, # so we need to check to see if the condition actually changed. # See https://github.com/chadrem/workers/issues/7 break if all_tasks_finished? end end @tasks.all? { |t| t.succeeded? } end
successes()
click to toggle source
# File lib/workers/task_group.rb, line 57 def successes @tasks.select { |t| t.succeeded? } end
synchronize(&block)
click to toggle source
Convenient mutex to be used by a users's task code that needs serializing. This should NEVER be used by TaskGroup
code (use the @internal_lock instead);
# File lib/workers/task_group.rb, line 88 def synchronize(&block) @external_lock.synchronize { block.call } nil end
Private Instance Methods
all_tasks_finished?()
click to toggle source
# File lib/workers/task_group.rb, line 113 def all_tasks_finished? @finished_count >= @tasks.count end
finished(task)
click to toggle source
# File lib/workers/task_group.rb, line 104 def finished(task) @internal_lock.synchronize do @finished_count += 1 @conditional.signal if all_tasks_finished? end nil end
state!(*args)
click to toggle source
# File lib/workers/task_group.rb, line 96 def state!(*args) unless args.include?(@state) raise Workers::InvalidStateError, "Invalid state (#{@state})." end nil end