class RxRuby::AsyncSubject
Represents the result of an asynchronous operation. Each notification is broadcasted to all subscribed observers.
Attributes
gate[R]
observers[R]
unsubscribed[R]
Public Class Methods
new()
click to toggle source
# File lib/rx_ruby/subjects/async_subject.rb, line 19 def initialize @observers = [] @gate = Mutex.new @unsubscribed = false @stopped = false @error = nil @value = nil @has_value = false end
Public Instance Methods
has_observers?()
click to toggle source
Indicates whether the subject has observers subscribed to it.
# File lib/rx_ruby/subjects/async_subject.rb, line 30 def has_observers? observers && observers.length > 0 end
on_completed()
click to toggle source
Notifies all subscribed observers about the end of the sequence.
# File lib/rx_ruby/subjects/async_subject.rb, line 35 def on_completed os = nil v = nil hv = false gate.synchronize do check_unsubscribed unless @stopped os = @observers.clone @observers = [] @stopped = true v = @value hv = @has_value end end if os if hv os.each do |o| o.on_next @value o.on_completed end else os.each {|o| o.on_completed } end end end
on_error(error)
click to toggle source
Notifies all subscribed observers with the error.
# File lib/rx_ruby/subjects/async_subject.rb, line 65 def on_error(error) raise 'error cannot be nil' unless error os = nil gate.synchronize do check_unsubscribed unless @stopped os = observers.clone @observers = [] @stopped = true @error = error end end os.each {|o| o.on_error error } if os end
on_next(value)
click to toggle source
Notifies all subscribed observers with the value.
# File lib/rx_ruby/subjects/async_subject.rb, line 84 def on_next(value) gate.synchronize do check_unsubscribed unless @stopped @value = value @has_value = true end end end
subscribe(observer)
click to toggle source
Subscribes an observer to the subject.
# File lib/rx_ruby/subjects/async_subject.rb, line 95 def subscribe(observer) raise 'observer cannot be nil' unless observer err = nil v = nil hv = false gate.synchronize do check_unsubscribed if !@stopped observers.push(observer) return InnerSubscription.new(self, observer) end err = @error v = @value hv = @has_value end if err observer.on_next err elsif hv observer.on_next v observer.on_completed else observer.on_completed end Subscription.empty end
unsubscribe()
click to toggle source
Unsubscribe all observers and release resources.
# File lib/rx_ruby/subjects/async_subject.rb, line 128 def unsubscribe gate.synchronize do @unsubscribed = true @observers = nil @error = nil @value = nil end end
Private Instance Methods
check_unsubscribed()
click to toggle source
# File lib/rx_ruby/subjects/async_subject.rb, line 157 def check_unsubscribed raise ArgumentError.new 'Subject unsubscribed' if unsubscribed end