class RxRuby::JoinObserver

Attributes

queue[R]

Public Class Methods

new(source, on_error) click to toggle source
Calls superclass method RxRuby::ObserverBase::new
# File lib/rx_ruby/joins/join_observer.rb, line 5
def initialize(source, on_error)
  super Observer.configure {|o|
    o.on_next {|notification|
      if !@is_disposed
        if notification.on_error?
          @on_error.call(notification.exception)
          next
        end
        @queue.push notification
        @active_plans.dup.each {|v|
          v.match
        }
      end
    }
  }
  @source = source
  @on_error = on_error
  @queue = []
  @active_plans = []
  @subscription = SingleAssignmentSubscription.new
  @is_disposed = false
end

Public Instance Methods

add_active_plan(active_plan) click to toggle source
# File lib/rx_ruby/joins/join_observer.rb, line 28
def add_active_plan(active_plan)
  @active_plans.push active_plan
end
remove_active_plan(active_plan) click to toggle source
# File lib/rx_ruby/joins/join_observer.rb, line 36
def remove_active_plan(active_plan)
  if idx = @active_plans.index(active_plan)
    @active_plans.delete_at idx
  end
  self.unsubscribe if @active_plans.length == 0
end
subscribe() click to toggle source
# File lib/rx_ruby/joins/join_observer.rb, line 32
def subscribe
  @subscription.subscription = @source.materialize.subscribe(@config)
end
unsubscribe() click to toggle source
Calls superclass method RxRuby::ObserverBase#unsubscribe
# File lib/rx_ruby/joins/join_observer.rb, line 43
def unsubscribe
  super
  if !@is_disposed
    @is_disposed = true
    @subscription.unsubscribe
  end
end