class RxRuby::CheckedObserver
Public Class Methods
new(observer)
click to toggle source
# File lib/rx_ruby/core/checked_observer.rb, line 19 def initialize(observer) @observer = observer @state = :idle end
Public Instance Methods
on_completed()
click to toggle source
# File lib/rx_ruby/core/checked_observer.rb, line 42 def on_completed check_access begin @observer.on_completed ensure Mutex.new.synchronize { @state = :done } end end
on_error(error)
click to toggle source
# File lib/rx_ruby/core/checked_observer.rb, line 33 def on_error(error) check_access begin @observer.on_error error ensure Mutex.new.synchronize { @state = :done } end end
on_next(value)
click to toggle source
# File lib/rx_ruby/core/checked_observer.rb, line 24 def on_next(value) check_access begin @observer.on_next value ensure Mutex.new.synchronize { @state = :idle } end end
Private Instance Methods
check_access()
click to toggle source
# File lib/rx_ruby/core/checked_observer.rb, line 53 def check_access Mutex.new.synchronize do old = @state @state = :busy if @state == :idle case old when :busy raise 'Re-entrancy detected' when :done raise 'Observer terminated' end end end