class Celluloid::Task::Threaded

Tasks with a Thread backend

Public Class Methods

new(type, meta) click to toggle source

Run the given block within a task

Calls superclass method Celluloid::Task::new
# File lib/celluloid/task/threaded.rb, line 6
def initialize(type, meta)
  @resume_queue = Queue.new
  @exception_queue = Queue.new
  @yield_mutex  = Mutex.new
  @yield_cond   = ConditionVariable.new
  @thread = nil

  super
end

Public Instance Methods

backtrace() click to toggle source
# File lib/celluloid/task/threaded.rb, line 54
def backtrace
  @thread.backtrace
end
create() { || ... } click to toggle source
# File lib/celluloid/task/threaded.rb, line 16
def create
  # TODO: move this to ActorSystem#get_thread (ThreadHandle inside Group::Pool)
  thread = Internals::ThreadHandle.new(Thread.current[:celluloid_actor_system], :task) do
    begin
      ex = @resume_queue.pop
      raise ex if ex.is_a?(TaskTerminated)

      yield
    rescue ::Exception => ex
      @exception_queue << ex
    ensure
      @yield_mutex.synchronize do
        @yield_cond.signal
      end
    end
  end
  @thread = thread
end
deliver(value) click to toggle source
# File lib/celluloid/task/threaded.rb, line 42
def deliver(value)
  raise DeadTaskError, "cannot resume a dead task" unless @thread.alive?

  @yield_mutex.synchronize do
    @resume_queue.push(value)
    @yield_cond.wait(@yield_mutex)
    raise @exception_queue.pop until @exception_queue.empty?
  end
rescue ThreadError
  raise DeadTaskError, "cannot resume a dead task"
end
signal() click to toggle source
# File lib/celluloid/task/threaded.rb, line 35
def signal
  @yield_mutex.synchronize do
    @yield_cond.signal
  end
  @resume_queue.pop
end