class RxRuby::AsyncSubject

Represents the result of an asynchronous operation. Each notification is broadcasted to all subscribed observers.

Attributes

gate[R]
observers[R]
unsubscribed[R]

Public Class Methods

new() click to toggle source
# File lib/rx_ruby/subjects/async_subject.rb, line 19
def initialize
  @observers = []
  @gate = Mutex.new
  @unsubscribed = false
  @stopped = false
  @error = nil
  @value = nil
  @has_value = false
end

Public Instance Methods

has_observers?() click to toggle source

Indicates whether the subject has observers subscribed to it.

# File lib/rx_ruby/subjects/async_subject.rb, line 30
def has_observers?
  observers && observers.length > 0
end
on_completed() click to toggle source

Notifies all subscribed observers about the end of the sequence.

# File lib/rx_ruby/subjects/async_subject.rb, line 35
def on_completed
  os = nil
  v = nil
  hv = false

  gate.synchronize do
    check_unsubscribed

    unless @stopped
      os = @observers.clone
      @observers = []
      @stopped = true
      v = @value
      hv = @has_value
    end
  end

  if os
    if hv
      os.each do |o|
        o.on_next @value
        o.on_completed
      end
    else
      os.each {|o| o.on_completed }
    end
  end
end
on_error(error) click to toggle source

Notifies all subscribed observers with the error.

# File lib/rx_ruby/subjects/async_subject.rb, line 65
def on_error(error)
  raise 'error cannot be nil' unless error

  os = nil
  gate.synchronize do
    check_unsubscribed

    unless @stopped
      os = observers.clone
      @observers = []
      @stopped = true
      @error = error
    end
  end

  os.each {|o| o.on_error error } if os
end
on_next(value) click to toggle source

Notifies all subscribed observers with the value.

# File lib/rx_ruby/subjects/async_subject.rb, line 84
def on_next(value)
  gate.synchronize do
    check_unsubscribed
    unless @stopped
      @value = value
      @has_value = true
    end
  end
end
subscribe(observer) click to toggle source

Subscribes an observer to the subject.

# File lib/rx_ruby/subjects/async_subject.rb, line 95
def subscribe(observer)
  raise 'observer cannot be nil' unless observer

  err = nil
  v = nil
  hv = false

  gate.synchronize do
    check_unsubscribed

    if !@stopped
      observers.push(observer)
      return InnerSubscription.new(self, observer)
    end

    err = @error
    v = @value
    hv = @has_value
  end

  if err
    observer.on_next err
  elsif hv
    observer.on_next v
    observer.on_completed
  else
    observer.on_completed
  end

  Subscription.empty
end
unsubscribe() click to toggle source

Unsubscribe all observers and release resources.

# File lib/rx_ruby/subjects/async_subject.rb, line 128
def unsubscribe
  gate.synchronize do
    @unsubscribed = true
    @observers = nil
    @error = nil
    @value = nil
  end
end

Private Instance Methods

check_unsubscribed() click to toggle source
# File lib/rx_ruby/subjects/async_subject.rb, line 157
def check_unsubscribed
  raise ArgumentError.new 'Subject unsubscribed' if unsubscribed
end