class RxRuby::ActivePlan

Public Class Methods

new(join_observer_array, on_next, on_completed) click to toggle source
# File lib/rx_ruby/joins/active_plan.rb, line 3
def initialize(join_observer_array, on_next, on_completed)
  @join_observer_array = join_observer_array
  @on_next = on_next
  @on_completed = on_completed
  @join_observers = {}
  @join_observer_array.each {|x|
    @join_observers[x] = x
  }
end

Public Instance Methods

dequeue() click to toggle source
# File lib/rx_ruby/joins/active_plan.rb, line 13
def dequeue
  @join_observers.each {|_, v| v.queue.shift }
end
match() click to toggle source
# File lib/rx_ruby/joins/active_plan.rb, line 17
def match
  has_values = true
  @join_observer_array.each {|v|
    if v.queue.length == 0
      has_values = false
      break
    end
  }
  if has_values
    first_values = []
    is_completed = false
    @join_observer_array.each {|v|
      first_values.push v.queue[0]
      is_completed = true if v.queue[0].on_completed?
    }
    if is_completed
      @on_completed.call
    else
      dequeue
      values = []
      first_values.each {|v|
        values.push v.value
      }
      @on_next.call(*values)
    end
  end
end