class Empathy::EM::Thread

Acts like a ::Thread using Fibers and EventMachine

{::Thread} methods not implemented by Empathy

*  .exclusive - not implemented
*  #critical - not implemented
*  #set_trace_func - not implemented
*  #safe_level - not implemented
*  #priority - not implemented

Attributes

fiber[R]

@return [Fiber] The underlying fiber.

Public Class Methods

current() click to toggle source

Like ::Thread::current. Get the currently running EM::Thread, eg to access thread local variables @return [Thread] representing the current Fiber

# File lib/empathy/em/thread.rb, line 90
def self.current
  @@em_threads[Fiber.current] || ProxyThread.new(Fiber.current)
end
exit() click to toggle source

Like ::Thread.exit @return [Thread]

# File lib/empathy/em/thread.rb, line 125
def self.exit
  current.exit
end
kill(thread) click to toggle source

Like ::Thread.kill @return [Thread]

# File lib/empathy/em/thread.rb, line 131
def self.kill(thread)
  thread.exit
end
list() click to toggle source

Like ::Thread::list. Return an array of all EM::Threads that are alive.

@return [Array<Thread>]

# File lib/empathy/em/thread.rb, line 76
def self.list
  @@em_threads.values.select { |s| s.alive? }
end
main() click to toggle source

Like ::Thread.main

@return [Thread]

# File lib/empathy/em/thread.rb, line 83
def self.main
  @@main
end
new(*args,&block) click to toggle source

@private

Calls superclass method
# File lib/empathy/em/thread.rb, line 136
def self.new(*args,&block)
  instance = super(*args,&block)
  ::Kernel.raise ThreadError, "super not called for subclass of Thread" unless instance.instance_variable_defined?("@fiber")
  instance
end
new(*args,&block) click to toggle source
# File lib/empathy/em/thread.rb, line 149
def initialize(*args,&block)
  initialize_fiber(*args,&block)
end
pass() click to toggle source

Like ::Thread::pass. The fiber is paused and resumed on the next_tick of EM’s event loop

@return [nil]

# File lib/empathy/em/thread.rb, line 116
def self.pass
  em_thread = current
  ::EM.next_tick{ em_thread.__send__(:wake_resume) }
  em_thread.__send__(:yield_sleep)
  nil
end
start(*args,&block) click to toggle source

Like ::Thread.start

@return [Thread]

# File lib/empathy/em/thread.rb, line 145
def self.start(*args,&block)
  ::Kernel.raise ArgumentError, "no block" unless block_given?
  c = if self != Thread
        Class.new(self) do
          def initialize(*args,&block)
            initialize_fiber(*args,&block)
          end
        end
      else
        self
      end
  c.new(*args,&block)
end
stop() click to toggle source

Like ::Thread::stop. Sleep forever (until woken) @return [void]

# File lib/empathy/em/thread.rb, line 108
def self.stop
  Kernel.sleep()
end
yield(*args) click to toggle source

Alias for Fiber::yield Equivalent to a thread being blocked on IO

WARNING: Be very careful about using yield with the other thread like methods Specifically it is important to ensure user calls to resume don’t conflict with the resumes that are setup via EM.timer or EM.next_tick as a result of #::sleep or #::pass

# File lib/empathy/em/thread.rb, line 101
def self.yield(*args)
  Fiber.yield(*args)
end

Public Instance Methods

[](name) click to toggle source

Access to “fiber local” variables, akin to “thread local” variables. @param [Symbol] name @return [Object,nil]

# File lib/empathy/em/thread.rb, line 274
def [](name)
  ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym)
  @locals[name.to_sym]
end
[]=(name, value) click to toggle source

Access to “fiber local” variables, akin to “thread local” variables.

# File lib/empathy/em/thread.rb, line 280
def []=(name, value)
  ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym)
  @locals[name.to_sym] = value
end
alive?() click to toggle source

Like ::Thread#alive? or Fiber#alive? @return [true,false]

# File lib/empathy/em/thread.rb, line 184
def alive?
  fiber.alive?
end
ensure_hook(key,&block) click to toggle source

Do something when the fiber completes. @return [void]

# File lib/empathy/em/thread.rb, line 307
def ensure_hook(key,&block)
  if block_given? then
    @ensure_hooks[key] = block
  else
    @ensure_hooks.delete(key)
  end
end
exit() click to toggle source

Like ::Thread#exit. Signals thread to wakeup and die

@return [nil, Thread]

# File lib/empathy/em/thread.rb, line 225
def exit
  case @status
  when :sleep
    wake_resume(:exit)
  when :run
    throw :exit
  end
end
Also aliased as: kill, terminate
inspect() click to toggle source

Like ::Thread#inspect @return [String]

# File lib/empathy/em/thread.rb, line 301
def inspect
  "#<Empathy::EM::Thread:0x%s %s %s" % [object_id, @fiber == Fiber.current ? "run" : "yielded", status || "dead" ]
end
join(limit = nil) click to toggle source

Like ::Thread#join. @param [Numeric] limit seconds to wait for thread to expire @return [nil,Thread] nil if timeout expires, otherwise this Thread

# File lib/empathy/em/thread.rb, line 169
def join(limit = nil)
  @mutex.synchronize { @join_cond.wait(@mutex,limit) } if alive?
  ::Kernel.raise @exception if @exception
  if alive? then nil else self end
end
key?(name) click to toggle source

Like ::Thread#key? Is there a “fiber local” variable defined called name @param [Symbol] name @return [true,false]

# File lib/empathy/em/thread.rb, line 288
def key?(name)
  ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym)
  @locals.has_key?(name.to_sym)
end
keys() click to toggle source

Like ::Thread#keys The set of “em_thread local” variable keys @return [Array<Symbol>]

# File lib/empathy/em/thread.rb, line 295
def keys()
  @locals.keys
end
kill()
Alias for: exit
raise(*args) click to toggle source

Like ::Thread#raise, raise an exception on a sleeping Thread @overload raise()

@raise RuntimeError

@overload raise(string)

@param [String] string
@raise RuntimeError

@overload raise(exception,string=nil,array=caller())

@param [Class,String,Object) exception
@param [String] string exception message
@param [Array<String>] array caller information
@raise Exception

@return [void]

# File lib/empathy/em/thread.rb, line 257
def raise(*args)
  args << RuntimeError if args.empty?
  if fiber == Fiber.current
    ::Kernel.raise(*args)
  elsif status
    wake_resume(:raise,*args)
  else
    #dead em_thread, do nothing
  end
end
resume(*args) click to toggle source

Like Fiber#resume. Refer to warnings on #::yield

# File lib/empathy/em/thread.rb, line 176
def resume(*args)
  #TODO  should only allow if @status is :run, which really means
  # blocked by a call to Yield
  fiber.resume(*args)
end
run()
Alias for: wakeup
status() click to toggle source

Like ::Thread#status @return [String] @return [false] @return [nil]

# File lib/empathy/em/thread.rb, line 199
def status
  case @status
  when :run
    #TODO - if not the current fiber
    # we can only be in this state due to a yield on the
    # underlying fiber, which means we are actually in sleep
    # or we're a ProxyThread that is dead and not yet
    # cleaned up
    "run"
  when :sleep
    "sleep"
  when :dead, :killed
    false
  when :exception
    nil
  end
end
stop?() click to toggle source

Like ::Thread#stop? @return [false] if called on the current fiber @return [true] otherwise

# File lib/empathy/em/thread.rb, line 191
def stop?
  Fiber.current != fiber
end
terminate()
Alias for: exit
value() click to toggle source

Like ::Thread#value. Implicitly calls join.

# File lib/empathy/em/thread.rb, line 218
def value
  join and @value
end
wakeup() click to toggle source

Like ::Thread#wakeup Wakes a sleeping Thread @return [Thread]

# File lib/empathy/em/thread.rb, line 239
def wakeup
  ::Kernel.raise ThreadError, "dead em_thread" unless status
  wake_resume()
  self
end
Also aliased as: run

Private Instance Methods

init(fiber) click to toggle source
# File lib/empathy/em/thread.rb, line 359
def init(fiber)
  @fiber = fiber
  # Add us to the list of living em_threads.
  @@main ||= self
  @@main = self unless @@main.status

  @@em_threads[@fiber] = self

  # Initialize our "fiber local" storage.
  @locals = {}

  # Record the status
  @status = nil

  # Hooks to run when the em_thread dies (eg by Mutex to release locks)
  @ensure_hooks = {}

  # Condition variable and mutex for joining.
  @mutex =  Mutex.new()
  @join_cond = ConditionVariable.new()
end
initialize_fiber(*args,&block) click to toggle source
# File lib/empathy/em/thread.rb, line 348
def initialize_fiber(*args,&block)
  ::Kernel.raise ThreadError, "already initialized" if @fiber
  # Create our fiber.
  fiber = Fiber.new{ fiber_body(*args,&block) }

  init(fiber)

  # Finally start the em_thread.
  fiber.resume()
end
run_ensure_hooks() click to toggle source
# File lib/empathy/em/thread.rb, line 404
def run_ensure_hooks()
  #TODO - better not throw exceptions in an ensure hook
  @ensure_hooks.each { |key,hook| hook.call }
end
wake_resume(event = :wake,*args) click to toggle source
# File lib/empathy/em/thread.rb, line 396
def wake_resume(event = :wake,*args)
  fiber.resume(event,*args) if @status == :sleep
  #TODO if fiber is still alive? and status = :run
  # then it has been yielded from non Empathy code.
  # if it is not alive, and is a proxy em_thread then
  # we can signal the condition variable from here
end
yield_sleep(timer=nil) click to toggle source
# File lib/empathy/em/thread.rb, line 381
def yield_sleep(timer=nil)
  @status = :sleep
  event,*args = Fiber.yield
  timer.cancel if timer
  case event
  when :exit
    @status = :killed
    throw :exit
  when :wake
    @status = :run
  when :raise
    ::Kernel.raise(*args)
  end
end