class RxRuby::BehaviorSubject
Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
Attributes
gate[R]
observers[R]
unsubscribed[R]
Public Class Methods
new(value)
click to toggle source
# File lib/rx_ruby/subjects/behavior_subject.rb, line 18 def initialize(value) @value = value @observers = [] @gate = Mutex.new @unsubscribed = false @stopped = false @error = nil end
Public Instance Methods
has_observers?()
click to toggle source
Indicates whether the subject has observers subscribed to it.
# File lib/rx_ruby/subjects/behavior_subject.rb, line 28 def has_observers? observers && observers.length > 0 end
on_completed()
click to toggle source
Notifies all subscribed observers about the end of the sequence.
# File lib/rx_ruby/subjects/behavior_subject.rb, line 42 def on_completed os = nil @gate.synchronize do self.check_unsubscribed unless @stopped os = @observers.clone @observers = [] @stopped = true end end os.each {|o| observer.on_completed } if os end
on_error(error)
click to toggle source
Notifies all subscribed observers with the error.
# File lib/rx_ruby/subjects/behavior_subject.rb, line 58 def on_error(error) raise 'error cannot be nil' unless error os = nil @gate.synchronize do self.check_unsubscribed unless @stopped os = @observers.clone @observers = [] @stopped = true @error = error end end os.each {|o| observer.on_error error } if os end
on_next(value)
click to toggle source
Notifies all subscribed observers with the value.
# File lib/rx_ruby/subjects/behavior_subject.rb, line 77 def on_next(value) os = nil @gate.synchronize do self.check_unsubscribed @value = value os = @observers.clone unless @stopped end os.each {|o| o.on_next value } if os end
subscribe(observer)
click to toggle source
Subscribes an observer to the subject.
# File lib/rx_ruby/subjects/behavior_subject.rb, line 89 def subscribe(observer) raise 'observer cannot be nil' unless observer err = nil gate.synchronize do self.check_unsubscribed unless @stopped observers.push(observer) observer.on_next(@value) return InnerSubscription.new(self, observer) end err = @error end if err observer.on_next err else observer.on_completed end Subscription.empty end
unsubscribe()
click to toggle source
Unsubscribe all observers and release resources.
# File lib/rx_ruby/subjects/behavior_subject.rb, line 115 def unsubscribe gate.synchronize do @unsubscribed = true @observers = nil @error = nil @value = nil end end
value()
click to toggle source
Gets the current value or throws an exception.
# File lib/rx_ruby/subjects/behavior_subject.rb, line 33 def value gate.synchronize do self.check_unsubscribed raise @error if @error @value end end
Private Instance Methods
check_unsubscribed()
click to toggle source
# File lib/rx_ruby/subjects/behavior_subject.rb, line 144 def check_unsubscribed raise 'Subject unsubscribed' if unsubscribed end