class RxRuby::BehaviorSubject

Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.

Attributes

gate[R]
observers[R]
unsubscribed[R]

Public Class Methods

new(value) click to toggle source
# File lib/rx_ruby/subjects/behavior_subject.rb, line 18
def initialize(value)
  @value = value
  @observers = []
  @gate = Mutex.new
  @unsubscribed = false
  @stopped = false
  @error = nil
end

Public Instance Methods

has_observers?() click to toggle source

Indicates whether the subject has observers subscribed to it.

# File lib/rx_ruby/subjects/behavior_subject.rb, line 28
def has_observers?
  observers && observers.length > 0
end
on_completed() click to toggle source

Notifies all subscribed observers about the end of the sequence.

# File lib/rx_ruby/subjects/behavior_subject.rb, line 42
def on_completed
  os = nil
  @gate.synchronize do 
    self.check_unsubscribed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
    end 
  end

  os.each {|o| observer.on_completed } if os
end
on_error(error) click to toggle source

Notifies all subscribed observers with the error.

# File lib/rx_ruby/subjects/behavior_subject.rb, line 58
def on_error(error)
  raise 'error cannot be nil' unless error

  os = nil
  @gate.synchronize do
    self.check_unsubscribed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
      @error = error
    end         
  end

  os.each {|o| observer.on_error error } if os
end
on_next(value) click to toggle source

Notifies all subscribed observers with the value.

# File lib/rx_ruby/subjects/behavior_subject.rb, line 77
def on_next(value) 
  os = nil
  @gate.synchronize do
    self.check_unsubscribed
    @value = value
    os = @observers.clone unless @stopped
  end

  os.each {|o| o.on_next value } if os
end
subscribe(observer) click to toggle source

Subscribes an observer to the subject.

# File lib/rx_ruby/subjects/behavior_subject.rb, line 89
def subscribe(observer)
  raise 'observer cannot be nil' unless observer

  err = nil
  gate.synchronize do
    self.check_unsubscribed

    unless @stopped
      observers.push(observer)
      observer.on_next(@value)
      return InnerSubscription.new(self, observer)
    end

    err = @error
  end

  if err
    observer.on_next err
  else
    observer.on_completed
  end

  Subscription.empty
end
unsubscribe() click to toggle source

Unsubscribe all observers and release resources.

# File lib/rx_ruby/subjects/behavior_subject.rb, line 115
def unsubscribe
  gate.synchronize do
    @unsubscribed = true
    @observers = nil
    @error = nil
    @value = nil
  end
end
value() click to toggle source

Gets the current value or throws an exception.

# File lib/rx_ruby/subjects/behavior_subject.rb, line 33
def value
  gate.synchronize do 
    self.check_unsubscribed
    raise @error if @error
    @value
  end
end

Private Instance Methods

check_unsubscribed() click to toggle source
# File lib/rx_ruby/subjects/behavior_subject.rb, line 144
def check_unsubscribed
  raise 'Subject unsubscribed' if unsubscribed
end