class RxRuby::ActivePlan
Public Class Methods
new(join_observer_array, on_next, on_completed)
click to toggle source
# File lib/rx_ruby/joins/active_plan.rb, line 3 def initialize(join_observer_array, on_next, on_completed) @join_observer_array = join_observer_array @on_next = on_next @on_completed = on_completed @join_observers = {} @join_observer_array.each {|x| @join_observers[x] = x } end
Public Instance Methods
dequeue()
click to toggle source
# File lib/rx_ruby/joins/active_plan.rb, line 13 def dequeue @join_observers.each {|_, v| v.queue.shift } end
match()
click to toggle source
# File lib/rx_ruby/joins/active_plan.rb, line 17 def match has_values = true @join_observer_array.each {|v| if v.queue.length == 0 has_values = false break end } if has_values first_values = [] is_completed = false @join_observer_array.each {|v| first_values.push v.queue[0] is_completed = true if v.queue[0].on_completed? } if is_completed @on_completed.call else dequeue values = [] first_values.each {|v| values.push v.value } @on_next.call(*values) end end end