class Empathy::EM::Thread
Acts like a ::Thread using Fibers and EventMachine
{::Thread} methods not implemented by Empathy
* .exclusive - not implemented * #critical - not implemented * #set_trace_func - not implemented * #safe_level - not implemented * #priority - not implemented
Attributes
@return [Fiber] The underlying fiber.
Public Class Methods
Like ::Thread::current. Get the currently running EM::Thread
, eg to access thread local variables @return [Thread] representing the current Fiber
# File lib/empathy/em/thread.rb, line 90 def self.current @@em_threads[Fiber.current] || ProxyThread.new(Fiber.current) end
Like ::Thread.exit @return [Thread]
# File lib/empathy/em/thread.rb, line 125 def self.exit current.exit end
Like ::Thread.kill @return [Thread]
# File lib/empathy/em/thread.rb, line 131 def self.kill(thread) thread.exit end
Like ::Thread::list. Return an array of all EM::Threads that are alive.
@return [Array<Thread>]
# File lib/empathy/em/thread.rb, line 76 def self.list @@em_threads.values.select { |s| s.alive? } end
Like ::Thread.main
@return [Thread]
# File lib/empathy/em/thread.rb, line 83 def self.main @@main end
@private
# File lib/empathy/em/thread.rb, line 136 def self.new(*args,&block) instance = super(*args,&block) ::Kernel.raise ThreadError, "super not called for subclass of Thread" unless instance.instance_variable_defined?("@fiber") instance end
# File lib/empathy/em/thread.rb, line 149 def initialize(*args,&block) initialize_fiber(*args,&block) end
Like ::Thread::pass. The fiber is paused and resumed on the next_tick of EM’s event loop
@return [nil]
# File lib/empathy/em/thread.rb, line 116 def self.pass em_thread = current ::EM.next_tick{ em_thread.__send__(:wake_resume) } em_thread.__send__(:yield_sleep) nil end
Like ::Thread.start
@return [Thread]
# File lib/empathy/em/thread.rb, line 145 def self.start(*args,&block) ::Kernel.raise ArgumentError, "no block" unless block_given? c = if self != Thread Class.new(self) do def initialize(*args,&block) initialize_fiber(*args,&block) end end else self end c.new(*args,&block) end
Like ::Thread::stop. Sleep forever (until woken) @return [void]
# File lib/empathy/em/thread.rb, line 108 def self.stop Kernel.sleep() end
Alias for Fiber::yield Equivalent to a thread being blocked on IO
WARNING: Be very careful about using yield with the other thread like methods Specifically it is important to ensure user calls to resume
don’t conflict with the resumes that are setup via EM.timer or EM.next_tick as a result of #::sleep or #::pass
# File lib/empathy/em/thread.rb, line 101 def self.yield(*args) Fiber.yield(*args) end
Public Instance Methods
Access to “fiber local” variables, akin to “thread local” variables. @param [Symbol] name @return [Object,nil]
# File lib/empathy/em/thread.rb, line 274 def [](name) ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym) @locals[name.to_sym] end
Access to “fiber local” variables, akin to “thread local” variables.
# File lib/empathy/em/thread.rb, line 280 def []=(name, value) ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym) @locals[name.to_sym] = value end
Like ::Thread#alive? or Fiber#alive? @return [true,false]
# File lib/empathy/em/thread.rb, line 184 def alive? fiber.alive? end
Do something when the fiber completes. @return [void]
# File lib/empathy/em/thread.rb, line 307 def ensure_hook(key,&block) if block_given? then @ensure_hooks[key] = block else @ensure_hooks.delete(key) end end
Like ::Thread#exit. Signals thread to wakeup and die
@return [nil, Thread]
# File lib/empathy/em/thread.rb, line 225 def exit case @status when :sleep wake_resume(:exit) when :run throw :exit end end
Like ::Thread#inspect @return [String]
# File lib/empathy/em/thread.rb, line 301 def inspect "#<Empathy::EM::Thread:0x%s %s %s" % [object_id, @fiber == Fiber.current ? "run" : "yielded", status || "dead" ] end
Like ::Thread#join. @param [Numeric] limit seconds to wait for thread to expire @return [nil,Thread] nil if timeout expires, otherwise this Thread
# File lib/empathy/em/thread.rb, line 169 def join(limit = nil) @mutex.synchronize { @join_cond.wait(@mutex,limit) } if alive? ::Kernel.raise @exception if @exception if alive? then nil else self end end
Like ::Thread#key? Is there a “fiber local” variable defined called name
@param [Symbol] name @return [true,false]
# File lib/empathy/em/thread.rb, line 288 def key?(name) ::Kernel.raise TypeError, "name #{name} must convert to_sym" unless name and name.respond_to?(:to_sym) @locals.has_key?(name.to_sym) end
Like ::Thread#keys The set of “em_thread local” variable keys @return [Array<Symbol>]
# File lib/empathy/em/thread.rb, line 295 def keys() @locals.keys end
Like ::Thread#raise, raise an exception on a sleeping Thread
@overload raise()
@raise RuntimeError
@overload raise(string)
@param [String] string @raise RuntimeError
@overload raise(exception,string=nil,array=caller())
@param [Class,String,Object) exception @param [String] string exception message @param [Array<String>] array caller information @raise Exception
@return [void]
# File lib/empathy/em/thread.rb, line 257 def raise(*args) args << RuntimeError if args.empty? if fiber == Fiber.current ::Kernel.raise(*args) elsif status wake_resume(:raise,*args) else #dead em_thread, do nothing end end
Like Fiber#resume. Refer to warnings on #::yield
# File lib/empathy/em/thread.rb, line 176 def resume(*args) #TODO should only allow if @status is :run, which really means # blocked by a call to Yield fiber.resume(*args) end
Like ::Thread#status @return [String] @return [false] @return [nil]
# File lib/empathy/em/thread.rb, line 199 def status case @status when :run #TODO - if not the current fiber # we can only be in this state due to a yield on the # underlying fiber, which means we are actually in sleep # or we're a ProxyThread that is dead and not yet # cleaned up "run" when :sleep "sleep" when :dead, :killed false when :exception nil end end
Like ::Thread#stop? @return [false] if called on the current fiber @return [true] otherwise
# File lib/empathy/em/thread.rb, line 191 def stop? Fiber.current != fiber end
Like ::Thread#value. Implicitly calls join
.
# File lib/empathy/em/thread.rb, line 218 def value join and @value end
Private Instance Methods
# File lib/empathy/em/thread.rb, line 359 def init(fiber) @fiber = fiber # Add us to the list of living em_threads. @@main ||= self @@main = self unless @@main.status @@em_threads[@fiber] = self # Initialize our "fiber local" storage. @locals = {} # Record the status @status = nil # Hooks to run when the em_thread dies (eg by Mutex to release locks) @ensure_hooks = {} # Condition variable and mutex for joining. @mutex = Mutex.new() @join_cond = ConditionVariable.new() end
# File lib/empathy/em/thread.rb, line 348 def initialize_fiber(*args,&block) ::Kernel.raise ThreadError, "already initialized" if @fiber # Create our fiber. fiber = Fiber.new{ fiber_body(*args,&block) } init(fiber) # Finally start the em_thread. fiber.resume() end
# File lib/empathy/em/thread.rb, line 404 def run_ensure_hooks() #TODO - better not throw exceptions in an ensure hook @ensure_hooks.each { |key,hook| hook.call } end
# File lib/empathy/em/thread.rb, line 396 def wake_resume(event = :wake,*args) fiber.resume(event,*args) if @status == :sleep #TODO if fiber is still alive? and status = :run # then it has been yielded from non Empathy code. # if it is not alive, and is a proxy em_thread then # we can signal the condition variable from here end
# File lib/empathy/em/thread.rb, line 381 def yield_sleep(timer=nil) @status = :sleep event,*args = Fiber.yield timer.cancel if timer case event when :exit @status = :killed throw :exit when :wake @status = :run when :raise ::Kernel.raise(*args) end end