module PWork::Async

Public Class Methods

add_task(options, &block) click to toggle source
# File lib/pwork/async.rb, line 49
def self.add_task(options, &block)
  task = PWork::Async::Task.new.tap do |e|
    e.block = block
    e.caller = options[:caller]
  end

  unless options[:wait] == false
    tasks << task
  end

  manager.add_task(task)

  task.id
end
async_forked(options = {}, &block) click to toggle source
# File lib/pwork/async.rb, line 22
def self.async_forked(options = {}, &block)
  if block_given?
    pid = fork do
      block.call
    end
    PWork::Async.tasks << pid unless options[:wait] == false
  else
    PWork::Async.tasks.each do |pid|
      Process.wait(pid)
    end
    reset
  end
end
async_test(options = {}, &block) click to toggle source
# File lib/pwork/async.rb, line 18
def self.async_test(options = {}, &block)
  block.call if block_given?
end
async_threaded(options = {}, caller, &block) click to toggle source
# File lib/pwork/async.rb, line 36
def self.async_threaded(options = {}, caller, &block)
  if block_given?
    options[:caller] = caller
    PWork::Async.add_task(options, &block)
  else
    PWork::Async.wait_for_tasks({ caller: caller, command: options })
  end
end
handle_errors() click to toggle source
# File lib/pwork/async.rb, line 84
def self.handle_errors
  error_messages = []
  tasks.select { |t| t.state == :error }.each do |t|
    error_messages << "Error: #{t.error.message}, #{t.error.backtrace}"
  end
  raise PWork::Async::Exceptions::TaskError.new(
    "1 or more async errors occurred. #{error_messages.join(' | ')}"
  ) if error_messages.length > 0
  true
end
manager() click to toggle source
# File lib/pwork/async.rb, line 45
def self.manager
  @manager ||= PWork::Async::Manager.new
end
mode() click to toggle source
# File lib/pwork/async.rb, line 95
def self.mode
  ENV.fetch('PWORK_ASYNC_MODE', 'thread').to_s.downcase
end
reset() click to toggle source
# File lib/pwork/async.rb, line 99
def self.reset
  Thread.current[:pwork_async_tasks] = []
end
tasks() click to toggle source
# File lib/pwork/async.rb, line 64
def self.tasks
  Thread.current[:pwork_async_tasks] ||= []
end
wait_for_tasks(options) click to toggle source
# File lib/pwork/async.rb, line 68
def self.wait_for_tasks(options)
  case options[:command]
    when :wait
      task_list = tasks
    when :wait_local
      task_list = tasks.select { |t| t.caller == options[:caller] }
  end

  task_list.each { |t| t.thread.join }

  handle_errors

  ensure
    Thread.current[:pwork_async_tasks] -= task_list
end

Public Instance Methods

async(options = {}, &block) click to toggle source
# File lib/pwork/async.rb, line 7
def async(options = {}, &block)
  case PWork::Async.mode
    when 'fork'
      PWork::Async.async_forked(options, &block)
    when 'test'
      PWork::Async.async_test(options, &block)
    else
      PWork::Async.async_threaded(options, self, &block)
  end
end