class RxRuby::ScheduledObserver
Public Class Methods
new(scheduler, observer)
click to toggle source
Calls superclass method
RxRuby::ObserverBase::new
# File lib/rx_ruby/core/scheduled_observer.rb, line 11 def initialize(scheduler, observer) @scheduler = scheduler @observer = observer @gate = Monitor.new @queue = [] @subscriber = SerialSubscription.new @acquired = false @faulted = false config = ObserverConfiguration.new config.on_next(&method(:on_next_core)) config.on_error(&method(:on_error_core)) config.on_completed(&method(:on_completed_core)) super(config) end
Public Instance Methods
ensure_active(n=0)
click to toggle source
# File lib/rx_ruby/core/scheduled_observer.rb, line 40 def ensure_active(n=0) owner = false @gate.synchronize do if !@faulted && @queue.length > 0 owner = !@acquired @acquired = true end end @subscriber.subscription = @scheduler.schedule_recursive_with_state(nil, method(:run)) if owner end
on_completed_core()
click to toggle source
# File lib/rx_ruby/core/scheduled_observer.rb, line 36 def on_completed_core @gate.synchronize { @queue.push(lambda { @observer.on_completed }) } end
on_error_core(error)
click to toggle source
# File lib/rx_ruby/core/scheduled_observer.rb, line 32 def on_error_core(error) @gate.synchronize { @queue.push(lambda { @observer.on_error error }) } end
on_next_core(value)
click to toggle source
# File lib/rx_ruby/core/scheduled_observer.rb, line 28 def on_next_core(value) @gate.synchronize { @queue.push(lambda { @observer.on_next value }) } end
run(state, recurse)
click to toggle source
# File lib/rx_ruby/core/scheduled_observer.rb, line 53 def run(state, recurse) work = nil @gate.synchronize do if @queue.length > 0 work = @queue.shift else @acquired = false return end end begin work.call rescue => e @queue = [] @faulted = true raise e end recurse.call state end
unsubscribe()
click to toggle source
Calls superclass method
RxRuby::ObserverBase#unsubscribe
# File lib/rx_ruby/core/scheduled_observer.rb, line 76 def unsubscribe super @subscriber.unsubscribe end