module RxRuby::Observable

Time based operations

Public Class Methods

amb(*args) click to toggle source

Propagates the observable sequence that reacts first.

# File lib/rx_ruby/operators/multiple.rb, line 426
def amb(*args)
  args.reduce(Observable.never) {|previous, current| previous.amb current }
end
case(selector, sources, defaultSourceOrScheduler = Observable.empty) click to toggle source
# File lib/rx_ruby/linq/observable/case.rb, line 3
def case(selector, sources, defaultSourceOrScheduler = Observable.empty)
  defer {
    if Scheduler === defaultSourceOrScheduler
      defaultSourceOrScheduler = Observable.empty(defaultSourceOrScheduler)
    end

    result = sources[selector.call]
    result || defaultSourceOrScheduler
  }
end
Also aliased as: switchCase
combine_latest(*args, &result_selector) click to toggle source

Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.

# File lib/rx_ruby/operators/multiple.rb, line 494
def combine_latest(*args, &result_selector)
  AnonymousObservable.new do |observer|
    result_selector ||= lambda {|*inner_args| inner_args }

    n = args.length
    has_value = Array.new(n, false)
    has_value_all = false

    values = Array.new(n)
    is_done = Array.new(n, false)

    next_item = lambda do |i|
      has_value[i] = true
      if has_value_all || (has_value_all = has_value.all?)
        res = nil
        begin
          res = result_selector.call(*values)
        rescue => e
          observer.on_error e
          return
        end

        observer.on_next(res)
      elsif enumerable_select_with_index(is_done) {|_, j| j != i} .all?
        observer.on_completed
        return
      end
    end

    done = lambda do |i|
      is_done[i] = true
      observer.on_completed if is_done.all?
    end

    gate = Monitor.new
    subscriptions = Array.new(n) do |i|
      sas = SingleAssignmentSubscription.new

      sas_obs = Observer.configure do |o|
        o.on_next do |x|
          values[i] = x
          next_item.call i
        end

        o.on_error(&observer.method(:on_error))

        o.on_completed { done.call i }
      end

      sas.subscription = args[i].synchronize(gate).subscribe(sas_obs)

      sas
    end

    CompositeSubscription.new subscriptions
  end
end
concat(*args) click to toggle source

Concatenates all of the specified observable sequences, as long as the previous observable sequence terminated successfully.

# File lib/rx_ruby/operators/multiple.rb, line 553
def concat(*args)
  AnonymousObservable.new do |observer|
    disposed = false
    e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum
    subscription = SerialSubscription.new
    gate = AsyncLock.new

    cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this|
      gate.wait do
        current = nil
        has_next = false
        err = nil

        if disposed
          return
        else
          begin
            current = e.next
            has_next = true
          rescue StopIteration => _
            # Do nothing
          rescue => e
            err = e
          end
        end

        if err
          observer.on_error err
          return
        end

        unless has_next
          observer.on_completed
          return
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d

        new_obs = Observer.configure do |o|
          o.on_next(&observer.method(:on_next))
          o.on_error(&observer.method(:on_error))
          o.on_completed { this.call }
        end

        current.subscribe new_obs
      end
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true }}]
  end
end
create(&subscribe) click to toggle source

Creates an observable sequence from a specified subscribe method implementation.

# File lib/rx_ruby/operators/creation.rb, line 17
def create(&subscribe)
  AnonymousObservable.new do |observer|
    subscription = subscribe.call(observer)
    case subscription
    when Subscription
      subscription
    when Proc
      Subscription.create(&subscription)
    else
      Subscription.empty
    end
  end
end
defer() { || ... } click to toggle source

Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.

# File lib/rx_ruby/operators/creation.rb, line 32
def defer
  AnonymousObservable.new do |observer|
    result = nil
    e = nil
    begin
      result = yield
    rescue => err
      e = Observable.raise_error(err).subscribe(observer)
    end

    e || result.subscribe(observer)
  end
end
empty(scheduler = ImmediateScheduler.instance) click to toggle source

Returns an empty observable sequence, using the specified scheduler to send out the single OnCompleted message.

# File lib/rx_ruby/operators/creation.rb, line 47
def empty(scheduler = ImmediateScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule lambda {
      observer.on_completed
    }
  end
end
for(sources, result_selector = nil) click to toggle source
# File lib/rx_ruby/linq/observable/for.rb, line 3
def for(sources, result_selector = nil)
  result_selector ||= lambda {|*args| args}
  enum = Enumerator.new {|y|
    sources.each {|v|
      y << result_selector.call(v)
    }
  }
  Observable.concat(enum)
end
fork_join(*all_sources) click to toggle source
# File lib/rx_ruby/linq/observable/fork_join.rb, line 3
def fork_join(*all_sources)
  AnonymousObservable.new {|subscriber|
    count = all_sources.length
    if count == 0
      subscriber.on_completed
      Subscription.empty
    end
    group = CompositeSubscription.new
    finished = false
    has_results = Array.new(count)
    has_completed  = Array.new(count)
    results  = Array.new(count)

    count.times {|i|
      source = all_sources[i]
      group.push(
        source.subscribe(
          lambda {|value|
            if !finished
              has_results[i] = true
              results[i] = value
            end
          },
          lambda {|e|
            finished = true
            subscriber.on_error e
            group.dispose
          },
          lambda {
            if !finished
              if !has_results[i]
                  subscriber.on_completed
                  return
              end
              has_completed[i] = true
              count.times {|ix|
                if !has_completed[ix]
                  return
                end
              }
              finished = true
              subscriber.on_next results
              subscriber.on_completed
            end
          }
        )
      )
    }
    group
  }
end
from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/from.rb, line 3
def from(iterable, map_fn = nil, scheduler = CurrentThreadScheduler.instance)
  it = iterable.to_enum
  AnonymousObservable.new {|observer|
    i = 0
    scheduler.schedule_recursive lambda {|this|
      begin
        result = it.next
      rescue StopIteration => e
        observer.on_completed
        return
      rescue => e
        observer.on_error e
        return
      end

      if Proc === map_fn
        begin
          result = map_fn.call(result, i)
        rescue => e
          observer.on_error e
          return
        end
      end

      observer.on_next result
      i += 1
      this.call
    }
  }
end
from_array(array, scheduler = CurrentThreadScheduler.instance) click to toggle source
# File lib/rx_ruby/operators/creation.rb, line 204
def from_array(array, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule_recursive_with_state 0, lambda {|i, this|
      if i < array.size
        observer.on_next array[i]
        this.call(i + 1)
      else
        observer.on_completed
      end
    }
  end
end
generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance) click to toggle source

Generates an observable sequence by running a state-driven loop producing the sequence’s elements.

# File lib/rx_ruby/operators/creation.rb, line 56
def generate(initial_state, condition, iterate, result_selector, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    state = initial_state
    first = true
    
    scheduler.schedule_recursive lambda{|this|
      has_result = false
      result = nil
      begin

        if first
          first = false
        else
          state = iterate.call(state)
        end

        has_result = condition.call(state)

        if has_result
          result = result_selector.call state
        end
      rescue => err
        observer.on_error err
        return
      end
      if has_result
        observer.on_next result
        this.call
      else
        observer.on_completed
      end
    }
  end
end
if(condition, then_source, else_source_or_scheduler = nil) click to toggle source
# File lib/rx_ruby/linq/observable/if.rb, line 3
def if(condition, then_source, else_source_or_scheduler = nil)
  case else_source_or_scheduler
  when Scheduler
    scheduler = else_source_or_scheduler
    else_source = Observable.empty(scheduler)
  when Observable
    else_source = else_source_or_scheduler
  when nil
    else_source = Observable.empty
  end

  return condition.call ? then_source : else_source
end
interval(period, scheduler = RxRuby::DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/interval.rb, line 2
def self.interval(period, scheduler = RxRuby::DefaultScheduler.instance)
  observable_timer_time_span_and_period(period, period, scheduler)
end
just(value, scheduler = ImmediateScheduler.instance) click to toggle source

Returns an observable sequence that contains a single element.

# File lib/rx_ruby/operators/creation.rb, line 99
def just(value, scheduler = ImmediateScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule lambda {
      observer.on_next value
      observer.on_completed
    }
  end
end
Also aliased as: return
merge(*args)
Alias for: merge_all
merge_all(*args) click to toggle source

Merges elements from all of the specified observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.

# File lib/rx_ruby/operators/multiple.rb, line 612
def merge_all(*args)
  scheduler = CurrentThreadScheduler.instance
  if args.size > 0 && Scheduler === args[0]
    scheduler = args.shift
  end
  Observable.from_array(args, scheduler).merge_all
end
Also aliased as: merge
merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args) click to toggle source

Merges elements from all observable sequences in the given enumerable sequence into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences, and using the specified scheduler for enumeration of and subscription to the sources.

# File lib/rx_ruby/operators/multiple.rb, line 607
def merge_concurrent(max_concurrent, scheduler = CurrentThreadScheduler.instance, *args)
  Observable.from_array(args, scheduler).merge_concurrent(max_concurrent)
end
never() click to toggle source

Returns a non-terminating observable sequence, which can be used to denote an infinite duration (e.g. when using reactive joins).

# File lib/rx_ruby/operators/creation.rb, line 92
def never
  AnonymousObservable.new do |_|

  end
end
of(*args) click to toggle source
# File lib/rx_ruby/linq/observable/of.rb, line 3
def of(*args)
  scheduler = CurrentThreadScheduler.instance
  if args.size > 0 && Scheduler === args[0]
    scheduler = args.shift
  end
  of_array(args, scheduler)
end
of_array(array, scheduler = CurrentThreadScheduler.instance) click to toggle source

Converts an array to an observable sequence, using an optional scheduler to enumerate the array.

# File lib/rx_ruby/operators/creation.rb, line 110
def of_array(array, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    count = 0
    scheduler.schedule_recursive lambda {|this|
      if count < array.length
        observer.on_next array[count]
        count += 1
        this.call
      else
        observer.on_completed
      end
    }
  end
end
of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance) click to toggle source

Converts an Enumerable to an observable sequence, using an optional scheduler to enumerate the array.

# File lib/rx_ruby/operators/creation.rb, line 126
def of_enumerable(enumerable, scheduler = CurrentThreadScheduler.instance)
  Observable.of_enumerator(enumerable.to_enum, scheduler)
end
of_enumerator(enum, scheduler = CurrentThreadScheduler.instance) click to toggle source

Converts an Enumerator to an observable sequence, using an optional scheduler to enumerate the array.

# File lib/rx_ruby/operators/creation.rb, line 131
def of_enumerator(enum, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule_recursive lambda {|this|
      has_value = false
      value = nil

      begin
        value = enum.next
        has_value = true
      rescue StopIteration => _
        observer.on_completed
      rescue => e
        observer.on_error e
      end

      if has_value
        observer.on_next value
        this.call
      end
    }
  end
end
on_error_resume_next(*args) click to toggle source

Concatenates all of the specified observable sequences, even if the previous observable sequence terminated exceptionally.

# File lib/rx_ruby/operators/multiple.rb, line 622
def on_error_resume_next(*args)
  AnonymousObservable.new do |observer|
    gate = AsyncLock.new
    disposed = false
    e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum
    subscription = SerialSubscription.new

    cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this|
      gate.wait do
        current = nil
        has_next = false
        err = nil

        if !disposed
          begin
            current = e.next
            has_next = true
          rescue StopIteration => _
            # Do nothing
          rescue => e
            err = e
          end
        else
          return
        end

        if err
          observer.on_error err
          return
        end

        unless has_next
          observer.on_completed
          return
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d

        new_obs = Observer.configure do |o|
          o.on_next(&observer.method(:on_next))
          o.on_error {|_| this.call }
          o.on_completed { this.call }
        end

        d.subscription = current.subscribe new_obs
      end
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }]
  end
end
pairs(obj, scheduler = CurrentThreadScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/pairs.rb, line 3
def pairs(obj, scheduler = CurrentThreadScheduler.instance)
  of_enumerable(obj, scheduler)
end
raise_error(error, scheduler = ImmediateScheduler.instance) click to toggle source

Returns an observable sequence that terminates with an exception.

# File lib/rx_ruby/operators/creation.rb, line 155
def raise_error(error, scheduler = ImmediateScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule lambda {
      observer.on_error error
    }
  end
end
range(start, count, scheduler = CurrentThreadScheduler.instance) click to toggle source

Generates an observable sequence of integral numbers within a specified range.

# File lib/rx_ruby/operators/creation.rb, line 164
def range(start, count, scheduler = CurrentThreadScheduler.instance)
  AnonymousObservable.new do |observer|
    scheduler.schedule_recursive_with_state 0, lambda {|i, this|
      if i < count
        observer.on_next (start + i)
        this.call(i + 1)
      else
        observer.on_completed
      end
    }
  end
end
repeat(value, count, scheduler = CurrentThreadScheduler.instance) click to toggle source

Generates an observable sequence that repeats the given element the specified number of times.

# File lib/rx_ruby/operators/creation.rb, line 183
def repeat(value, count, scheduler = CurrentThreadScheduler.instance)
  Observable.just(value, scheduler).repeat(count)
end
repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance) click to toggle source

Generates an observable sequence that repeats the given element infinitely.

# File lib/rx_ruby/operators/creation.rb, line 178
def repeat_infinitely(value, scheduler = CurrentThreadScheduler.instance)
  Observable.just(value, scheduler).repeat_infinitely
end
rescue_error(*args) click to toggle source

Continues an observable sequence that is terminated by an exception with the next observable sequence.

# File lib/rx_ruby/operators/multiple.rb, line 431
def rescue_error(*args)
  AnonymousObservable.new do |observer|
    gate = AsyncLock.new
    disposed = false
    e = args.length == 1 && args[0].is_a?(Enumerator) ? args[0] : args.to_enum
    subscription = SerialSubscription.new
    last_error = nil

    cancelable = CurrentThreadScheduler.instance.schedule_recursive lambda {|this|
      gate.wait do
        current = nil
        has_next = false
        error = nil

        if disposed
          return
        else
          begin
            current = e.next
            has_next = true
          rescue StopIteration => _
            # Do nothing
          rescue => e
            error = e
          end
        end

        if error
          observer.on_error error
          return
        end

        unless has_next
          if last_error
            observer.on_error last_error
          else
            observer.on_completed
          end
          return
        end

        new_obs = Observer.configure do |o|
          o.on_next(&observer.method(:on_next))

          o.on_error do |err|
            last_error = err
            this.call
          end

          o.on_completed(&observer.method(:on_completed))
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d
        d.subscription = current.subscribe new_obs
      end
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { gate.wait { disposed = true } }]
  end
end
return(value, scheduler = ImmediateScheduler.instance)
Alias for: just
start(func, context, scheduler = DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/start.rb, line 3
def start(func, context, scheduler = DefaultScheduler.instance)
  Observable.to_async(func, context, scheduler).call
end
switchCase(selector, sources, defaultSourceOrScheduler = Observable.empty)
Alias for: case
timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/timer.rb, line 3
def timer(due_time, period_or_scheduler = DefaultScheduler.instance, scheduler = DefaultScheduler.instance)
  case period_or_scheduler
  when Numeric
    period = period_or_scheduler
  when Scheduler
    scheduler = period_or_scheduler
  end

  if Time === due_time
    if period.nil?
      observable_timer_date(due_time, scheduler)
    else
      observable_timer_date_and_period(due_time, period, scheduler)
    end
  else
    if period.nil?
      observable_timer_time_span(due_time, scheduler)
    else
      observable_timer_time_span_and_period(due_time, period, scheduler)
    end
  end
end
to_async(func, context = nil, scheduler = DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/to_async.rb, line 3
def to_async(func, context = nil, scheduler = DefaultScheduler.instance)
  lambda() {|*args|
    subject = AsyncSubject.new

    scheduler.schedule lambda {
      begin
        if context
          result = proc_bind(func, context).call(*args)
        else
          result = func.call(*args)
        end
      rescue => e
        subject.on_error e
        return
      end
      subject.on_next result
      subject.on_completed
    }
    return subject.as_observable
  }
end
using(resource_factory, observable_factory) click to toggle source

Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence’s lifetime.

# File lib/rx_ruby/operators/creation.rb, line 188
def using(resource_factory, observable_factory)
  AnonymousObservable.new do |observer|
    source = nil
    subscription = Subscription.empty
    begin
      resource = resource_factory.call
      subscription = resource unless resource.nil?
      source = observable_factory.call resource
    rescue => e
      next CompositeSubscription.new [self.raise_error(e).subscribe(observer), subscription]
    end

    CompositeSubscription.new [source.subscribe(observer), subscription]
  end
end
when(*plans) click to toggle source
# File lib/rx_ruby/linq/observable/when.rb, line 3
def when(*plans)
  AnonymousObservable.new do |observer|
    active_plans = []
    external_subscriptions = {}
    out_observer = Observer.configure {|o|
      o.on_next(&observer.method(:on_next))
      o.on_error {|err|
        external_subscriptions.each {|_, v|
          v.on_error err
        }
      }
      o.on_completed(&observer.method(:on_completed))
    }
    begin
      plans.each {|x|
        active_plans.push x.activate(external_subscriptions, out_observer, lambda {|active_plan|
          active_plans.delete(active_plan)
          active_plans.length == 0 && observer.on_completed 
        })
      }
    rescue => e
      Observable.raise_error(e).subscribe(observer)
    end
    group = CompositeSubscription.new
    external_subscriptions.each {|_, join_observer|
      join_observer.subscribe
      group.push join_observer
    }

    group
  end
end
while(condition, source) click to toggle source
# File lib/rx_ruby/linq/observable/while.rb, line 3
def while(condition, source)
  enum = Enumerator.new {|y|
    while condition.call
      y << source
    end
  }
  scheduler = ImmediateScheduler.instance

  is_disposed = false
  subscription = SerialSubscription.new

  AnonymousObservable.new do |observer|
    cancelable = scheduler.schedule_recursive lambda {|this|
      return if is_disposed

      begin
        current_value = enum.next
      rescue StopIteration => e
        observer.on_completed
        return
      rescue => e
        observer.on_error e
        return
      end

      d = SingleAssignmentSubscription.new
      subscription.subscription = d
      d.subscription = current_value.subscribe(
        observer.method(:on_next),
        observer.method(:on_error),
        lambda { this.call }
      )
    }

    CompositeSubscription.new [subscription, cancelable, Subscription.create { is_disposed = true }]
  end
end
zip(*args, &result_selector) click to toggle source

Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.

# File lib/rx_ruby/operators/multiple.rb, line 676
def zip(*args, &result_selector)
  AnonymousObservable.new do |observer|
    result_selector ||= lambda {|*inner_args| inner_args }
    n = args.length

    queues = Array.new(n) {|i| Array.new }
    is_done = Array.new(n, false)

    next_action = lambda do |i|
      if queues.all? {|q| q.length > 0 }
        res = queues.map {|q| q.shift }
        observer.on_next(result_selector.call(*res))
      elsif enumerable_select_with_index(is_done) {|x, j| j != i } .all?
        observer.on_completed
      end
    end

    done = lambda do |i|
      is_done[i] = true
      observer.on_completed if is_done.all?
    end

    gate = Monitor.new

    subscriptions = Array.new(n) do |i|
      sas = SingleAssignmentSubscription.new

      sas_obs = Observer.configure do |o|
        o.on_next do |x|
          queues[i].push(x)
          next_action.call i
        end

        o.on_error(&observer.method(:on_error))

        o.on_completed { done.call i }
      end

      sas.subscription = args[i].synchronize(gate).subscribe(sas_obs)
      sas
    end

    subscriptions.push(Subscription.create { queues.each {|q| q = [] }})

    CompositeSubscription.new subscriptions
  end
end

Private Class Methods

enumerable_select_with_index(arr, &block) click to toggle source
# File lib/rx_ruby/operators/multiple.rb, line 726
def enumerable_select_with_index(arr, &block)
  [].tap do |new_arr|
    arr.each_with_index do |item, index|
      new_arr.push item if block.call item, index
    end
  end
end
observable_timer_date_and_period(due_time, period, scheduler) click to toggle source
# File lib/rx_ruby/linq/observable/_observable_timer_date_and_period.rb, line 4
def observable_timer_date_and_period(due_time, period, scheduler)
  AnonymousObservable.new do |observer|
    count = 0
    d = due_time
    p = Scheduler.normalize(period)
    scheduler.schedule_recursive_absolute(d, lambda {|this|
      if p > 0
        now = scheduler.now()
        d = d + p
        d <= now && (d = now + p)
      end
      observer.on_next(count)
      count += 1
      this.call(d)
    })
  end
end
observable_timer_time_span(due_time, scheduler) click to toggle source
# File lib/rx_ruby/linq/observable/_observable_timer_time_span.rb, line 4
def observable_timer_time_span(due_time, scheduler)
  AnonymousObservable.new do |observer|
    scheduler.schedule_relative(Scheduler.normalize(due_time),
      lambda {
        observer.on_next(0)
        observer.on_completed
      })
  end
end
observable_timer_time_span_and_period(due_time, period, scheduler) click to toggle source
# File lib/rx_ruby/linq/observable/_observable_timer_time_span_and_period.rb, line 4
def observable_timer_time_span_and_period(due_time, period, scheduler)
  if due_time == period
    AnonymousObservable.new do |observer|
      scheduler.schedule_periodic_with_state(0, period,
        lambda {|count|
          observer.on_next(count)
          count + 1
        })
    end
  else
    Observable.defer {
      observable_timer_date_and_period(scheduler.now() + due_time, period, scheduler)
    }
  end
end
proc_bind(block, object) click to toggle source

derived from Proc#to_method from Ruby Facets github.com/rubyworks/facets/blob/master/lib/core/facets/proc/to_method.rb

# File lib/rx_ruby/linq/observable/to_async.rb, line 29
def proc_bind(block, object)
  time = Time.now
  method_name = "__bind_#{time.to_i}_#{time.usec}"
  (class << object; self; end).class_eval do
    define_method(method_name, &block)
    method = instance_method(method_name)
    remove_method(method_name)
    method
  end.bind(object)
end

Public Instance Methods

_subscribe(observer) click to toggle source

Subscribes the given observer to the observable sequence. @param [Observer] observer @return [Subscription]

# File lib/rx_ruby/core/observable.rb, line 37
def _subscribe(observer)

  auto_detach_observer = AutoDetachObserver.new observer

  if CurrentThreadScheduler.schedule_required?
    CurrentThreadScheduler.instance.schedule_with_state auto_detach_observer, method(:schedule_subscribe)
  else
    begin
      auto_detach_observer.subscription = subscribe_core auto_detach_observer
    rescue => e
      raise e unless auto_detach_observer.fail e
    end
  end

  auto_detach_observer
end
add_ref(r) click to toggle source
# File lib/rx_ruby/internal/util.rb, line 3
def add_ref(r)
  AnonymousObservable.new do |observer|
    CompositeSubscription.new [r.subscription, self.subscribe(observer)]
  end
end
aggregate(*args, &block)
Alias for: reduce
all?(&block) click to toggle source

Determines whether all elements of an observable sequence satisfy a condition if block given, else if all are true @param [Proc] block @return [RxRuby::Observable]

# File lib/rx_ruby/operators/aggregates.rb, line 64
def all?(&block)
  block ||= lambda { |_| true }
  select {|v| !(block.call v)}.
  any?.
  map {|b| !b }
end
amb(second) click to toggle source

Propagates the observable sequence that reacts first.

# File lib/rx_ruby/operators/multiple.rb, line 26
def amb(second)
  AnonymousObservable.new do |observer|
    left_subscription = SingleAssignmentSubscription.new
    right_subscription = SingleAssignmentSubscription.new
    choice = :neither

    gate = Monitor.new

    left = AmbObserver.new
    right = AmbObserver.new

    handle_left = lambda do |&action|
      if choice == :neither
        choice = :left
        right_subscription.unsubscribe
        left.observer = observer
      end

      action.call if choice == :left
    end

    handle_right = lambda do |&action|
      if choice == :neither
        choice = :right
        left_subscription.unsubscribe
        right.observer = observer
      end

      action.call if choice == :right
    end

    left_obs = Observer.configure do |o|
      o.on_next {|x| handle_left.call { observer.on_next x } }
      o.on_error {|err| handle_left.call { observer.on_error err } }
      o.on_completed { handle_left.call { observer.on_completed } }
    end

    right_obs = Observer.configure do |o|
      o.on_next {|x| handle_right.call { observer.on_next x } }
      o.on_error {|err| handle_right.call { observer.on_error err } }
      o.on_completed { handle_right.call { observer.on_completed } }
    end        

    left.observer = Observer.allow_reentrancy(left_obs, gate)
    right.observer = Observer.allow_reentrancy(right_obs, gate)

    left_subscription.subscription = self.subscribe left
    right_subscription.subscription = second.subscribe right

    CompositeSubscription.new [left_subscription, right_subscription]
  end
end
and(right) click to toggle source
# File lib/rx_ruby/linq/observable/and.rb, line 3
def and(right)
  Pattern.new([self, right]);
end
any?(&block) click to toggle source

Determines whether any element of an observable sequence satisfies a condition if a block is given else if there are any items in the observable sequence. @return [RxRuby::Observable]

# File lib/rx_ruby/operators/aggregates.rb, line 84
def any?(&block)
  return map(&block).any? if block_given?
  AnonymousObservable.new do |observer|
    new_obs = Observer.configure do |o|
      o.on_next do |_|
        observer.on_next true
        observer.on_completed
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next false
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end
as_observable() click to toggle source

Hides the identity of an observable sequence.

# File lib/rx_ruby/operators/single.rb, line 16
def as_observable
  AnonymousObservable.new {|observer| subscribe(observer) }
end
average(&block) click to toggle source

Computes the average of an observable sequence of values that are optionally obtained by invoking a transform function on each element of the input sequence if a block is given @param [Object] block @return [RxRuby::Observable]

# File lib/rx_ruby/operators/aggregates.rb, line 109
def average(&block)
  return map(&block).average if block_given?
  scan({:sum => 0, :count => 0}) {|prev, current| {:sum => prev[:sum] + current, :count => prev[:count] + 1 }}.
  final.
  map {|x|
    raise 'Sequence contains no elements' if x[:count] == 0
    x[:sum] / x[:count]
  }
end
buffer_with_count(count, skip = count) click to toggle source

Projects each element of an observable sequence into zero or more buffers which are produced based on element count information.

# File lib/rx_ruby/operators/single.rb, line 21
def buffer_with_count(count, skip = count)
  raise ArgumentError.new 'Count must be greater than zero' if count <= 0
  raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0
  window_with_count(count, skip).flat_map(&:to_a).find_all {|x| x.length > 0 }
end
buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) click to toggle source

Projects each element of an observable sequence into consecutive non-overlapping buffers which are produced based on timing information.

# File lib/rx_ruby/operators/time.rb, line 22
def buffer_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance)
  raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0
  raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0
  window_with_time(time_span, time_shift, scheduler).flat_map(&:to_a)
end
combine_latest(other, &result_selector) click to toggle source

Merges two observable sequences into one observable sequence by using the selector function whenever one of the observable sequences produces an element.

# File lib/rx_ruby/operators/multiple.rb, line 117
def combine_latest(other, &result_selector)
  AnonymousObservable.new do |observer|
    has_left = false
    has_right = false

    left = nil
    right = nil

    left_done = false
    right_done = false

    left_subscription = SingleAssignmentSubscription.new
    right_subscription = SingleAssignmentSubscription.new

    gate = Monitor.new

    left_obs = Observer.configure do |o|
      o.on_next do |l|
        has_left = true
        left = l

        if has_right
          res = nil
          begin
            res = result_selector.call left, right
          rescue => e
            observer.on_error e
            return
          end
          observer.on_next res
        end

        observer.on_completed if right_done
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        left_done = true
        observer.on_completed if right_done
      end
    end

    right_obs = Observer.configure do |o|
      o.on_next do |r|
        has_right = true
        right = r

        if has_left
          res = nil
          begin
            res = result_selector.call left, right
          rescue => e
            observer.on_error e
            return
          end
          observer.on_next res
        end

        observer.on_completed if left_done
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        right_done = true
        observer.on_completed if left_done
      end
    end

    left_subscription.subscription = synchronize(gate).subscribe(left_obs)
    right_subscription.subscription = other.synchronize(gate).subscribe(right_obs)

    CompositeSubscription.new [left_subscription, right_subscription]
  end
end
concat(*other) click to toggle source

Concatenates the second observable sequence to the first observable sequence upon successful termination of the first.

# File lib/rx_ruby/operators/multiple.rb, line 195
def concat(*other)
  Observable.concat([self, *other].to_enum)
end
concat_all() click to toggle source
# File lib/rx_ruby/linq/observable/concat_all.rb, line 3
def concat_all
  merge_concurrent(1)
end
concat_map(selector, result_selector = nil) click to toggle source
# File lib/rx_ruby/linq/observable/concat_map.rb, line 3
def concat_map(selector, result_selector = nil)
  if Proc === result_selector
    return concat_map(lambda {|x, i|
      selector_result = selector.call(x, i)
      if selector_result.respond_to?(:each)
        selector_result = Observable.from(selector_result)
      end
      selector_result.map_with_index {|y, i2|
        result_selector.call(x, y, i, i2)
      }
    })
  end

  if Proc === selector
    _concat_map(selector)
  else
    _concat_map(lambda {|*_| selector })
  end
end
concat_map_observer(on_next, on_error, on_completed) click to toggle source
# File lib/rx_ruby/linq/observable/concat_map_observer.rb, line 3
def concat_map_observer(on_next, on_error, on_completed)
  AnonymousObservable.new do |observer|
    index = 0

    subscribe(
      lambda {|x|
        begin
          result = on_next.call(x, index)
          index += 1
        rescue => e
          observer.on_error e
          return
        end
        observer.on_next result
      },
      lambda {|err|
        begin
          result = on_error.call(err)
        rescue => e
          observer.on_error e
          return
        end

        observer.on_next result
        observer.on_completed
      },
      lambda {
        begin
          result = on_completed.call
        rescue => e
          observer.on_error e
          return
        end

        observer.on_next result
        observer.on_completed
      })
  end.concat_all
end
contains(search_element, from_index = 0) click to toggle source
# File lib/rx_ruby/linq/observable/contains.rb, line 3
def contains(search_element, from_index = 0)
  AnonymousObservable.new do |observer|
    i = 0
    n = from_index
    if n < 0
      observer.on_next false
      observer.on_completed
      return Subscription.empty
    end

    subscribe(
      lambda {|x|
        if i.tap { i += 1 } >= n && x == search_element
          observer.on_next true
          observer.on_completed
        end
      },
      observer.method(:on_error),
      lambda {
        observer.on_next false
        observer.on_completed
      })
  end
end
contains?(item) click to toggle source

Determines whether an observable sequence contains a specified element. @param [Object] item The value to locate in the source sequence. @return [RxRuby::Observable] An observable sequence containing a single element determining whether the source sequence contains an element that has the specified value.

# File lib/rx_ruby/operators/aggregates.rb, line 123
def contains?(item)
  select {|x| x.eql? item}.any?
end
count(&block) click to toggle source

Returns an observable sequence containing a number that represents how many elements in the specified observable sequence satisfy a condition if the block is given, else the number of items in the observable sequence

# File lib/rx_ruby/operators/aggregates.rb, line 130
def count(&block)
  return select(&block).count if block_given?
  reduce(0) {|c, _| c + 1 }
end
debounce(due_time, scheduler = DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/debounce.rb, line 3
def debounce(due_time, scheduler = DefaultScheduler.instance)
  AnonymousObservable.new do |observer|
    cancelable = SerialSubscription.new
    hasvalue = false
    value = nil
    id = 0

    subscription = subscribe(
      lambda {|x|
        hasvalue = true
        value = x
        id += 1
        current_id = id
        d = SingleAssignmentSubscription.new
        cancelable.subscription = d
        d.subscription = scheduler.schedule_relative(due_time, lambda {
          observer.on_next value if hasvalue && id == current_id
          hasvalue = false
        })
      },
      lambda {|e|
        cancelable.dispose
        observer.on_error e
        hasvalue = false
        id += 1
      },
      lambda {
        cancelable.dispose
        observer.on_next value if hasvalue
        observer.on_completed
        hasvalue = false
        id += 1
      })

    CompositeSubscription.new [subscription, cancelable]
  end
end
default_if_empty(default_value = nil) click to toggle source

Returns the elements of the specified sequence or the type parameter’s default value in a singleton sequence if the sequence is empty.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 16
def default_if_empty(default_value = nil)
  AnonymousObservable.new do |observer|
    found = false
    new_observer = Observer.configure do |o|
      
      o.on_next do |x|
        found = true
        observer.on_next x
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do 
        observer.on_next(default_value) unless found
        observer.on_completed
      end
    end

    subscribe(new_observer)
  end
end
delay(due_time, scheduler = DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/delay.rb, line 3
def delay(due_time, scheduler = DefaultScheduler.instance)
  if Time === due_time
    delay_date(due_time, scheduler)
  else
    delay_time_span(due_time, scheduler)
  end
end
delay_with_selector(subscription_delay, delay_duration_selector = nil) click to toggle source
# File lib/rx_ruby/linq/observable/delay_with_selector.rb, line 3
def delay_with_selector(subscription_delay, delay_duration_selector = nil)
  if Proc === subscription_delay
    selector = subscription_delay
  else
    sub_delay = subscription_delay
    selector = delay_duration_selector
  end

  AnonymousObservable.new do |observer|
    delays = CompositeSubscription.new
    at_end = false
    done = lambda {
      if at_end && delays.length == 0
        observer.on_completed
      end
    }
    subscription = SerialSubscription.new
    start = lambda {|*_|
      subscription.subscription = subscribe(
        lambda {|x|
          begin
            delay = selector.call(x)
          rescue => error
            observer.on_error error
            return
          end
          d = SingleAssignmentSubscription.new
          delays.push(d)
          d.subscription = delay.subscribe(
            lambda {|_|
              observer.on_next x
              delays.delete(d)
              done.call
            },
            observer.method(:on_error),
            lambda {
              observer.on_next x
              delays.delete(d)
              done.call
            })
        },
        observer.method(:on_error),
        lambda {
          at_end = true
          subscription.dispose
          done.call
        })
    }
    
    if !sub_delay
      start.call
    else
      subscription.subscription = sub_delay.subscribe(
        start,
        observer.method(:on_error),
        start)
    end
    CompositeSubscription.new [subscription, delays]
  end
end
dematerialize() click to toggle source

Dematerializes the explicit notification values of an observable sequence as implicit notifications.

# File lib/rx_ruby/operators/single.rb, line 28
def dematerialize
  AnonymousObservable.new do |observer|

    new_obs = RxRuby::Observer.configure do |o|
      o.on_next {|x| x.accept observer }
      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end
distinct(&key_selector) click to toggle source

Returns an observable sequence that contains only distinct elements according to the optional key_selector.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 39
def distinct(&key_selector)
  key_selector ||= lambda {|x| x}

  AnonymousObservable.new do |observer|

    h = Hash.new

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        key = nil
        has_added = false

        begin
          key = key_selector.call x
          key_s = key.to_s
          unless h.key? key_s
            has_added = true
            h[key_s] = true
          end
        rescue => e
          observer.on_error e
          next
        end

        observer.on_next x if has_added
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end
distinct_until_changed(&key_selector) click to toggle source

Returns an observable sequence that contains only distinct contiguous elements according to the optional key_selector.

# File lib/rx_ruby/operators/single.rb, line 42
def distinct_until_changed(&key_selector)
  key_selector ||= lambda {|x| x}
  AnonymousObservable.new do |observer|
    current_key = nil
    has_current = nil

    new_obs = RxRuby::Observer.configure do |o|
      o.on_next do |value|
        key = nil
        begin
          key = key_selector.call value
        rescue => err
          observer.on_error err
          next
        end

        if !current_key || key != current_key
          has_current = true
          current_key = key
          observer.on_next value
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end
do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil) click to toggle source
# File lib/rx_ruby/linq/observable/do.rb, line 3
def do(observer_or_on_next = nil, on_error_func = nil, on_completed_func = nil)
  if block_given?
    on_next_func = Proc.new
  elsif Proc === observer_or_on_next
    on_next_func = observer_or_on_next
  else
    on_next_func = observer_or_on_next.method(:on_next)
    on_error_func = observer_or_on_next.method(:on_error)
    on_completed_func = observer_or_on_next.method(:on_completed)
  end
  AnonymousObservable.new do |observer|
    subscribe(
      lambda {|x|
        begin
          on_next_func.call x
        rescue => e
          observer.on_error e
        end
        observer.on_next x
      },
      lambda {|err|
        begin
          on_error_func && on_error_func.call(x)
        rescue => e
          observer.on_error e
        end
        observer.on_error err
      },
      lambda {
        begin
          on_completed_func && on_completed_func.call
        rescue => e
          observer.on_error e
        end
        observer.on_completed
      })
  end
end
element_at(index) click to toggle source

Returns the element at a specified index in a sequence. @param [Numeric] index The zero-based index of the element to retrieve. @return [RxRuby::Observable] An observable sequence that produces the element at the specified position in the source sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 139
def element_at(index)
  raise ArgumentError.new 'index cannot be less than zero' if index < 0
  AnonymousObservable.new do |observer|
    i = index
    new_obs = Observer.configure do |o|
      o.on_next do |value|
        if i == 0
          observer.on_next value
          observer.on_completed
        end

        i -= 1
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed { raise 'Sequence contains no elements' }
    end

    subscribe new_obs
  end
end
element_at_or_default(index, default_value = nil) click to toggle source

Returns the element at a specified index in a sequence or a default value if the index is out of range. @param [Numeric] index The zero-based index of the element to retrieve. @param [Object] default_value The default value to use if the index is out of range.

# File lib/rx_ruby/operators/aggregates.rb, line 164
def element_at_or_default(index, default_value = nil)
  raise ArgumentError.new 'index cannot be less than zero' if index < 0
  AnonymousObservable.new do |observer|
    i = index
    new_obs = Observer.configure do |o|
      o.on_next do |value|
        if i == 0
          observer.on_next value
          observer.on_completed
        end

        i -= 1
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next default_value
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end
empty?() click to toggle source

Determines whether an observable sequence is empty. @return [RxRuby::Observable] An observable sequence containing a single element determining whether the source sequence is empty.

# File lib/rx_ruby/operators/aggregates.rb, line 242
def empty?
  any?.map {|b| !b }
end
ensures() { || ... } click to toggle source

Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.

# File lib/rx_ruby/operators/single.rb, line 117
def ensures
  AnonymousObservable.new do |observer|
    subscription = subscribe observer
    Subscription.create do
      begin
        subscription.unsubscribe
      ensure
        yield
      end
    end
  end
end
enumerator_repeat_infinitely(value) click to toggle source
# File lib/rx_ruby/operators/single.rb, line 390
def enumerator_repeat_infinitely(value)
  Enumerator.new do |y|
    while true
      y << value
    end
  end
end
enumerator_repeat_times(num, value) click to toggle source
# File lib/rx_ruby/operators/single.rb, line 382
def enumerator_repeat_times(num, value)
  Enumerator.new do |y|
    num.times do |i|
      y << value
    end
  end
end
final() click to toggle source

Internal method to get the final value @return [RxRuby::Observable]

# File lib/rx_ruby/operators/aggregates.rb, line 16
def final
  AnonymousObservable.new do |observer|
    value = nil
    has_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        value = x
        has_value = true
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        if has_value
          observer.on_next value
          observer.on_completed
        else
          observer.on_error(RuntimeError.new 'Sequence contains no elements')
        end
      end
    end

    subscribe new_obs
  end
end
find_all(&block)
Alias for: select
find_all_with_index(&block)
Alias for: select_with_index
first(&block) click to toggle source

Returns the first element of an observable sequence that satisfies the condition in the predicate if a block is given, else the first item in the observable sequence. @param [Proc] block Optional predicate function to evaluate for elements in the source sequence. @return [RxRuby::Observable] Sequence containing the first element in the observable sequence that satisfies the condition in the predicate if a block is given, else the first element.

# File lib/rx_ruby/operators/aggregates.rb, line 195
def first(&block)
  return select(&block).first if block_given?
  AnonymousObservable.new do |observer|
    new_obs = Observer.configure do |o|
      o.on_next do |x|
        observer.on_next x
        observer.on_completed
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed { raise 'Sequence contains no elements' }
    end

    subscribe new_obs
  end
end
first_or_default(default_value = nil, &block) click to toggle source

Returns the first element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. @param [Object] default_value The default value to use if the sequence is empty. @param [Proc] block An optional predicate function to evaluate for elements in the source sequence. @return [RxRuby::Observable] Sequence containing the first element in the observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists.

# File lib/rx_ruby/operators/aggregates.rb, line 218
def first_or_default(default_value = nil, &block)
  return select(&block).first_or_default(default_value) if block_given?
  AnonymousObservable.new do |observer|
    new_obs = Observer.configure do |o|
      o.on_next do |x|
        observer.on_next x
        observer.on_completed
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next default_value
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end
flat_map(&block) click to toggle source

Projects each element of the source observable sequence to the other observable sequence and merges the resulting observable sequences into one observable sequence.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 108
def flat_map(&block)
  map(&block).merge_all
end
flat_map_with_index(&block) click to toggle source

Projects each element of an observable sequence to an observable sequence by incorporating the element’s index and merges the resulting observable sequences into one observable sequence.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 113
def flat_map_with_index(&block)
  map_with_index(&block).merge_all
end
group_join(right, left_duration_selector, right_duration_selector, result_selector) click to toggle source
# File lib/rx_ruby/linq/observable/group_join.rb, line 3
def group_join(right, left_duration_selector, right_duration_selector, result_selector)
  AnonymousObservable.new do |observer|
    group = CompositeSubscription.new
    r = RefCountSubscription.new(group)
    left_map = {}
    right_map = {}
    left_id = 0
    right_id = 0

    left_obs = Observer.configure do |o|
      o.on_next {|value|
        s = Subject.new
        id = left_id
        left_id += 1
        left_map[id] = s

        begin
          result = result_selector.call(value, s.add_ref(r))
        rescue => err
          left_map.values.each {|v| v.on_error(err) }
          observer.on_error(err)
          next
        end
        observer.on_next(result)

        right_map.values.each {|v| s.on_next(v) }

        md = SingleAssignmentSubscription.new
        group.push md

        expire = lambda {
          if left_map.delete(id)
            s.on_completed
          end
          group.delete(md)
        }

        begin
          duration = left_duration_selector.call(value)
        rescue => err
          left_map.values.each {|v| v.on_error(err) }
          observer.on_error(err)
          next
        end

        md.subscription = duration.take(1).subscribe(
          lambda {|_| },
          lambda {|e|
            left_map.values.each {|v| v.on_error(e) }
            observer.on_error(e)
          },
          expire)
      }

      o.on_error {|e|
        left_map.values.each {|v| v.on_error(e) }
        observer.on_error(e)
      }

      o.on_completed(&observer.method(:on_completed))
    end
    group.push self.subscribe(left_obs)

    right_obs = Observer.configure do |o|
      o.on_next {|value|
        id = right_id
        right_id += 1
        right_map[id] = value

        md = SingleAssignmentSubscription.new
        group.push md

        expire = lambda {
          right_map.delete(id)
          group.delete(md)
        }

        begin
          duration = right_duration_selector.call(value)
        rescue => err
          right_map.values.each {|v| v.on_error(err) }
          observer.on_error(err)
          next
        end

        md.subscription = duration.take(1).subscribe(
          lambda {|_| },
          lambda {|e|
            left_map.values.each {|v| v.on_error(e) }
            observer.on_error(e)
          },
          expire)
      }

      o.on_error {|e|
        left_map.values.each {|v| v.on_error(e) }
        observer.on_error(e)
      }
    end
    group.push right.subscribe(right_obs)

    r
  end
end
ignore_elements() click to toggle source

Ignores all elements in an observable sequence leaving only the termination messages.

# File lib/rx_ruby/operators/single.rb, line 131
def ignore_elements
  AnonymousObservable.new do |observer|
    new_obs = RxRuby::Observer.configure do |o|
      o.on_next {|_| }
      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end
last(&block) click to toggle source

Returns the last element of an observable sequence that satisfies the condition in the predicate if the block is given, else the last element in the observable sequence. @param [Proc] block An predicate function to evaluate for elements in the source sequence. @return {RxRuby::Observable} Sequence containing the last element in the observable sequence that satisfies the condition in the predicate if given, or the last element in the observable sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 251
def last(&block)
  return select(&block).last if block_given?
  AnonymousObservable.new do |observer|

    value = nil
    seen_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |v|
        value = v
        seen_value = true
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        if seen_value
          observer.on_next value
          observer.on_completed
        else
          observer.on_error(RuntimeError.new 'Sequence contains no elements' )
        end
      end
    end

    subscribe new_obs
  end
end
last_or_default(default_value = nil, &block) click to toggle source

Returns the last element of an observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists. @param [Object] default_value The default value to use if the sequence is empty. @param [Proc] block An predicate function to evaluate for elements in the source sequence. @return {RxRuby::Observable} Sequence containing the last element in the observable sequence that satisfies the condition in the predicate if given, or a default value if no such element exists.

# File lib/rx_ruby/operators/aggregates.rb, line 286
def last_or_default(default_value = nil, &block)
  return select(&block).last_or_default(default_value) if block_given?
  AnonymousObservable.new do |observer|

    value = nil
    seen_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |v|
        value = v
        seen_value = true
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next (seen_value ? value : default_value)
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end
latest() click to toggle source

Transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence. Each time a new inner observable sequence is received, unsubscribe from the previous inner observable sequence.

# File lib/rx_ruby/operators/multiple.rb, line 347
def latest
  AnonymousObservable.new do |observer|
    gate = Monitor.new
    inner_subscription = SerialSubscription.new
    stopped = false
    latest_num = 0
    has_latest = false

    source_obs = Observer.configure do |o|
      o.on_next do |inner_source|
        id = 0

        gate.synchronize do
          latest_num += 1
          id = latest_num
        end

        d = SingleAssignmentSubscription.new

        inner_obs = Observer.configure do |io|
          io.on_next {|x| gate.synchronize { observer.on_next x if latest_num == id } }
          io.on_error do |err| 
            gate.synchronize do 
              has_latest = false
              observer.on_error err if latest_num == id
            end
          end
        end

        d.subscription = inner_source.subscribe inner_obs
      end

      o.on_error {|err| gate.synchronize { observer.on_error err } }

      o.on_completed do
        gate.synchronize do
          stopped = true
          observer.on_completed unless has_latest
        end
      end
    end

    subscription = subscribe source_obs

    CompositeSubscription.new [subscription, inner_subscription]
  end
end
map(&block) click to toggle source

Projects each element of an observable sequence into a new form.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 103
def map(&block)
  map_with_index {|x, _| block.call x }
end
map_with_index(&block) click to toggle source

Projects each element of an observable sequence into a new form by incorporating the element’s index.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 76
def map_with_index(&block)
  AnonymousObservable.new do |observer|
    new_observer = Observer.configure do |o|
      i = 0

      o.on_next do |x|
        result = nil
        begin
          result = block.call(x, i)
          i += 1
        rescue => e
          observer.on_error e
          next
        end

        observer.on_next result
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end
materialize() click to toggle source

Materializes the implicit notifications of an observable sequence as explicit notification values.

# File lib/rx_ruby/operators/single.rb, line 144
def materialize
  AnonymousObservable.new do |observer|
    new_obs = RxRuby::Observer.configure do |o|

      o.on_next {|x| observer.on_next(Notification.create_on_next x) }

      o.on_error do |err|
        observer.on_next(Notification.create_on_next err)
        observer.on_completed
      end

      o.on_completed do
        observer.on_next(Notification.create_on_completed)
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end
max(&block) click to toggle source

Returns the maximum element in an observable sequence. @param [Proc] block An optional selector function to produce an element. @return [RxRuby::Observable] The maximum element in an observable sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 314
def max(&block)
  return map(&block).max if block_given?
  max_by {x| x} .map {|x| x[0] }
end
max_by(&block) click to toggle source

Returns the elements in an observable sequence with the maximum key value. @param [Proc] block Key selector function. @return [RxRuby::Observable] An observable sequence containing a list of zero or more elements that have a maximum key value.

# File lib/rx_ruby/operators/aggregates.rb, line 323
def max_by(&block)
  extrema_by(&block)
end
merge(other, scheduler = CurrentThreadScheduler.instance) click to toggle source

Merges elements from two observable sequences into a single observable sequence, using the specified scheduler for enumeration of and subscription to the sources.

# File lib/rx_ruby/operators/multiple.rb, line 200
def merge(other, scheduler = CurrentThreadScheduler.instance)
  Observable.merge_all(scheduler, *[self, other])
end
merge_all() click to toggle source

Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.

# File lib/rx_ruby/operators/multiple.rb, line 265
def merge_all
  AnonymousObservable.new do |observer|
    gate = Monitor.new
    stopped = false
    m = SingleAssignmentSubscription.new
    group = CompositeSubscription.new [m]

    new_obs = Observer.configure do |o|
      o.on_next do |inner_source|
        inner_subscription = SingleAssignmentSubscription.new
        group << inner_subscription

        inner_obs = Observer.configure do |io|
          io.on_next {|x| gate.synchronize { observer.on_next x } }
          
          io.on_error {|err| gate.synchronize { observer.on_error err } }
          
          io.on_completed do
            group.delete inner_subscription
            gate.synchronize { observer.on_completed } if stopped && group.length == 1
          end
        end

        inner_subscription.subscription = inner_source.subscribe inner_obs
      end

      o.on_error {|err| gate.synchronize { observer.on_error err } }

      o.on_completed do
        stopped = true
        gate.synchronize { observer.on_completed } if group.length == 1
      end
    end

    subscribe new_obs
  end
end
merge_concurrent(max_concurrent = 1) click to toggle source

Merges elements from all inner observable sequences into a single observable sequence, limiting the number of concurrent subscriptions to inner sequences.

# File lib/rx_ruby/operators/multiple.rb, line 205
def merge_concurrent(max_concurrent = 1)
  AnonymousObservable.new do |observer|
    gate = Monitor.new
    q = []
    stopped = false
    group = CompositeSubscription.new
    active = 0

    subscriber = nil
    subscriber = lambda do |xs|
      subscription = SingleAssignmentSubscription.new
      group << subscription

      new_obs = Observer.configure do |o|
        o.on_next {|x| gate.synchronize { observer.on_next x } }
        
        o.on_error {|err| gate.synchronize { observer.on_error err } }
        
        o.on_completed do 
          group.delete subscription
          gate.synchronize do
            if q.length > 0
              s = q.shift
              subscriber.call s
            else
              active -= 1
              observer.on_completed if stopped && active == 0
            end
          end
        end
      end

      xs.subscribe new_obs
    end

    inner_obs = Observer.configure do |o|
      o.on_next do |inner_source|
        gate.synchronize do
          if active < max_concurrent
            active += 1
            subscriber.call inner_source
          else
            q << inner_source
          end
        end
      end

      o.on_error {|err| gate.synchronize { observer.on_error err } }

      o.on_completed do
        stopped = true
        observer.on_completed if active == 0
      end
    end

    group << subscribe(inner_obs)
  end
end
min(&block) click to toggle source

Returns the minimum element in an observable sequence. @param [Proc] block An optional selector function to produce an element. @return [RxRuby::Observable] The minimum element in an observable sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 330
def min(&block)
  return map(&block).min if block_given?
  min_by {|x| x} .map {|x| x[0] }
end
min_by(&block) click to toggle source

Returns the elements in an observable sequence with the minimum key value. @param [Proc] block Key selector function. @return [RxRuby::Observable] >An observable sequence containing a list of zero or more elements that have a minimum key value.

# File lib/rx_ruby/operators/aggregates.rb, line 339
def min_by(&block)
  extrema_by(true, &block)
end
multicast(subject_or_subject_selector, selector = nil) click to toggle source
# File lib/rx_ruby/linq/observable/multicast.rb, line 3
def multicast(subject_or_subject_selector, selector = nil)
  if Proc === subject_or_subject_selector
    AnonymousObservable.new do |observer|
      connectable = self.multicast(subject_or_subject_selector.call)
      CompositeSubscription.new [selector.call(connectable).subscribe(observer), self]
    end
  else
    ConnectableObservable.new(self, subject_or_subject_selector)
  end
end
none?(&block) click to toggle source

Determines whether no elements of an observable sequence satisfy a condition if block given, else if all are false @param [Proc] block @return [RxRuby::Observable]

# File lib/rx_ruby/operators/aggregates.rb, line 75
def none?(&block)
  block ||= lambda { |_| true }
  select {|v| !(block.call v)}.
  any?
end
observe_on(scheduler) click to toggle source

Wraps the source sequence in order to run its observer callbacks on the specified scheduler.

# File lib/rx_ruby/operators/synchronization.rb, line 32
def observe_on(scheduler)
  raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler

  AnonymousObservable.new do |observer|
    subscribe(ObserveOnObserver.new scheduler, observer)
  end
end
on_error_resume_next(other) click to toggle source

Concatenates the second observable sequence to the first observable sequence upon successful or exceptional termination of the first.

# File lib/rx_ruby/operators/multiple.rb, line 304
def on_error_resume_next(other)
  raise ArgumentError.new 'Other cannot be nil' unless other

  Observable.on_error_resume_next self, other
end
pluck(prop) click to toggle source
# File lib/rx_ruby/linq/observable/pluck.rb, line 3
def pluck(prop)
  self.map {|x| x[prop]}
end
publish(&selector) click to toggle source
# File lib/rx_ruby/linq/observable/publish.rb, line 3
def publish(&selector)
  if block_given?
    multicast(lambda { Subject.new }, Proc.new)
  else
    multicast(Subject.new)
  end
end
reduce(*args, &block) click to toggle source

Applies an accumulator function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified seed value is used as the initial accumulator value. For aggregation behavior with incremental intermediate results, see RxRuby::Observable.scan @return [RxRuby::Observable]

# File lib/rx_ruby/operators/aggregates.rb, line 47
def reduce(*args, &block)
  # Argument parsing to support:
  # 1. (seed, Symbol) || (seed, &block)
  # 2. (Symbol) || (&block)
  if (args.length == 2 && args[1].is_a?(Symbol)) || (args.length == 1 && block_given?)
    scan(*args, &block).start_with(args[0]).final
  elsif (args.length == 1 && args[0].is_a?(Symbol)) || (args.length == 0 && block_given?)
    scan(*args, &block).final
  else
    raise ArgumentError.new 'Invalid arguments'
  end
end
Also aliased as: aggregate
repeat(repeat_count) click to toggle source

Repeats the observable sequence a specified number of times.

# File lib/rx_ruby/operators/single.rb, line 171
def repeat(repeat_count)
  Observable.concat(enumerator_repeat_times(repeat_count, self))
end
repeat_infinitely() click to toggle source

Repeats the observable sequence indefinitely.

# File lib/rx_ruby/operators/single.rb, line 166
def repeat_infinitely
  Observable.concat(enumerator_repeat_infinitely(self))
end
rescue_error(other = nil, &action) click to toggle source

Continues an observable sequence that is terminated by an exception of the specified type with the observable sequence produced by the handler or continues an observable sequence that is terminated by an exception with the next observable sequence.

# File lib/rx_ruby/operators/multiple.rb, line 81
def rescue_error(other = nil, &action)
  return Observable.rescue_error(other) if other && !block_given?
  raise ArgumentError.new 'Invalid arguments' if other.nil? && !block_given?

  AnonymousObservable.new do |observer|
    subscription = SerialSubscription.new

    d1 = SingleAssignmentSubscription.new
    subscription.subscription = d1

    new_obs = Observer.configure do |o|
      o.on_next(&observer.method(:on_next))

      o.on_error do |err|
        result = nil
        begin
          result = action.call(err)
        rescue => e
          observer.on_error(e)
          next
        end

        d = SingleAssignmentSubscription.new
        subscription.subscription = d
        d.subscription = result.subscribe observer
      end

      o.on_completed(&observer.method(:on_completed))
    end

    d1.subscription = subscribe new_obs
    subscription
  end
end
retry(retry_count) click to toggle source

Repeats the source observable sequence the specified number of times or until it successfully terminates.

# File lib/rx_ruby/operators/single.rb, line 181
def retry(retry_count)
  Observable.rescue_error(enumerator_repeat_times(retry_count, self))
end
retry_infinitely() click to toggle source

Repeats the source observable sequence until it successfully terminates.

# File lib/rx_ruby/operators/single.rb, line 176
def retry_infinitely
  Observable.rescue_error(enumerator_repeat_infinitely(self))
end
scan(*args, &block) click to toggle source

Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. For aggregation behavior with no intermediate results, see Observable.reduce.

# File lib/rx_ruby/operators/single.rb, line 188
def scan(*args, &block)
  has_seed = false
  seed = nil
  action = nil

  # Argument parsing to support:
  # 1. (seed, Symbol)
  # 2. (seed, &block)
  # 3. (Symbol)
  # 4. (&block)
  if args.length == 2 && args[1].is_a?(Symbol)
    seed = args[0]
    action = args[1].to_proc
    has_seed = true
  elsif args.length == 1 && block_given?
    seed = args[0]
    has_seed = true
    action = block
  elsif args.length == 1 && args[0].is_a?(Symbol)
    action = args[0].to_proc
  elsif args.length == 0 && block_given?
    action = block
  else
    raise ArgumentError.new 'Invalid arguments'
  end

  AnonymousObservable.new do |observer|

    has_accumulation = false
    accumulation = nil
    has_value = false

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        begin
          has_value = true unless has_value

          if has_accumulation
            accumulation = action.call(accumulation, x)
          else
            accumulation = has_seed ? action.call(seed, x) : x
            has_accumulation = true
          end
        rescue => err
          observer.on_error err
          return
        end

        observer.on_next accumulation
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next seed if !has_value && has_seed
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end
select(&block) click to toggle source

Filters the elements of an observable sequence based on a predicate.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 244
def select(&block)
  select_with_index {|x, _| block.call x }
end
Also aliased as: find_all
select_with_index(&block) click to toggle source

Filters the elements of an observable sequence based on a predicate by incorporating the element’s index.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 250
def select_with_index(&block)
  AnonymousObservable.new do |observer|
    i = 0

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        should_run = false
        begin
          should_run = block.call(x, i)
          i += 1
        rescue => e
          observer.on_error e
          next
        end

        observer.on_next x if should_run
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)        
  end
end
Also aliased as: find_all_with_index
sequence_eql?(other) click to toggle source

Determines whether two sequences are equal by comparing the elements pairwise. @param [RxRuby::Observable] other Other observable sequence to compare. @return [RxRuby::Observable] An observable sequence that contains a single element which indicates whether both sequences are of equal length and their corresponding elements are equal.

# File lib/rx_ruby/operators/aggregates.rb, line 347
def sequence_eql?(other)
  AnonymousObservable.new do |observer|
    gate = Mutex.new
    left_done = false
    right_done = false
    left_queue = []
    right_queue = []

    obs1 = Observer.configure do |o|
      o.on_next do |x|
        gate.synchronize do
          if right_queue.length > 0
            v = right_queue.shift
            equal = x == v

            unless equal
              observer.on_next false
              observer.on_completed
            end
          elsif right_done
            observer.on_next false
            observer.on_completed
          else
            left_queue.push x
          end
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        gate.synchronize do
          left_done = true
          if left_queue.length == 0
            if right_queue.length > 0
              observer.on_next false
              observer.on_completed
            elsif right_done
              observer.on_next true
              observer.on_completed
            end
          end
        end
      end
    end

    subscription1 = subscribe obs1

    obs2 = Observer.configure do |o|
      o.on_next do |x|
        gate.synchronize do
          if left_queue.length > 0
            v = left_queue.shift
            equal = x == v

            unless equal
              observer.on_next false
              observer.on_completed
            end
          elsif left_done
            observer.on_next false
            observer.on_completed
          else
            right_queue.push x
          end
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        gate.synchronize do
          right_done = true
          if right_queue.length == 0
            if left_queue.length > 0
              observer.on_next false
              observer.on_completed
            elsif left_done
              observer.on_next true
              observer.on_completed
            end
          end
        end
      end
    end

    subscription2 = other.subscribe obs2

    CompositeSubscription.new [subscription1, subscription2]
  end
end
single(&block) click to toggle source

Returns the only element of an observable sequence, and reports an exception if there is not exactly one element in the observable sequence. @param [Proc] block A predicate function to evaluate for elements in the source sequence. @return [RxRuby::Observable] >Sequence containing the single element in the observable sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 443
def single(&block)
  return select(&block).single if block_given?
  AnonymousObservable.new do |observer|
    seen_value = false
    value = nil

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        if seen_value
          observer.on_error(RuntimeError.new 'More than one element produced')
        else
          value = x
          seen_value = true
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        if seen_value
          observer.on_next value
          observer.on_completed
        else
          observer.on_error(RuntimeError.new 'Sequence contains no elements')
        end
      end
    end

    subscribe new_obs
  end
end
single_or_default(default_value = nil, &block) click to toggle source

Returns the only element of an observable sequence, or a default value if the observable sequence is empty; this method reports an exception if there is more than one element in the observable sequence. @param [Object] default_value The default value if no value is provided @param [Proc] block A predicate function to evaluate for elements in the source sequence. @return [RxRuby::Observable] Sequence containing the single element in the observable sequence, or a default value if no such element exists.

# File lib/rx_ruby/operators/aggregates.rb, line 481
def single_or_default(default_value = nil, &block)
  return select(&block).single_or_default(default_value) if block_given?
  AnonymousObservable.new do |observer|
    seen_value = false
    value = nil

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        if seen_value
          observer.on_error(RuntimeError.new 'More than one element produced')
        else
          value = x
          seen_value = true
        end
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next (seen_value ? value : default_value)
        observer.on_completed
      end
    end

    subscribe new_obs
  end
end
skip(count) click to toggle source

Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 118
def skip(count)
  AnonymousObservable.new do |observer|
    remaining = count

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        if remaining <= 0
          observer.on_next x
        else 
          remaining -= 1
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end


    subscribe(new_observer)
  end
end
skip_last(count) click to toggle source

Bypasses a specified number of elements at the end of an observable sequence. @param [Numeric] count The number of elements to bypass at the end of an observable sequence.

# File lib/rx_ruby/operators/single.rb, line 253
def skip_last(count)
  raise ArgumentError.new 'Count cannot be less than zero' if count < 0
  AnonymousObservable.new do |observer|
    q = []
    new_obs = Observer.configure do |o|

      o.on_next do |x|
        q.push x
        observer.on_next(q.shift) if q.length > count
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe new_obs
  end
end
skip_until(other) click to toggle source

Returns the elements from the source observable sequence only after the other observable sequence produces an element.

# File lib/rx_ruby/operators/multiple.rb, line 311
def skip_until(other)
  raise ArgumentError.new 'Other cannot be nil' unless other

  AnonymousObservable.new do |observer|
    source_subscription = SingleAssignmentSubscription.new
    other_subscription = SingleAssignmentSubscription.new

    open = false
    gate = Monitor.new

    source_obs = Observer.configure do |o|
      o.on_next {|x| observer.on_next x if open }
      o.on_error(&observer.method(:on_error))
      o.on_completed { observer.on_completed if open }
    end

    other_obs = Observer.configure do |o|
      o.on_next do |_|
        open = true
        other_subscription.unsubscribe
      end

      o.on_error(&observer.method(:on_error))
    end

    source_subscription.subscription = synchronize(gate).subscribe(source_obs)
    other_subscription.subscription = other.synchronize(gate).subscribe(other_obs)

    CompositeSubscription.new [source_subscription, other_subscription]
  end
end
skip_while(&block) click to toggle source

Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 142
def skip_while(&block)
  skip_while_with_index {|x, _| block.call x }
end
skip_while_with_index(&block) click to toggle source

Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. The element’s index is used in the logic of the predicate function.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 148
def skip_while_with_index(&block)
  AnonymousObservable.new do |observer|
    running = false
    i = 0

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        unless running
          begin
            running = !block.call(x, i)
            i += 1
          rescue => e
            observer.on_error e
            next
          end

          observer.on_next x if running
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end
start_with(*args) click to toggle source

Prepends a sequence of values to an observable sequence.

# File lib/rx_ruby/operators/single.rb, line 273
def start_with(*args)
  scheduler = CurrentThreadScheduler.instance
  if args.size > 0 && Scheduler === args[0]
    scheduler = args.shift
  end
  Observable.from_array(args, scheduler).concat(self)
end
subscribe(*args) click to toggle source
# File lib/rx_ruby/core/observable.rb, line 13
def subscribe(*args)
  case args.size
  when 0
    if block_given?
      _subscribe Observer.configure {|o| o.on_next(&Proc.new) }
    else
      _subscribe Observer.configure
    end
  when 1
    _subscribe args[0]
  when 3
    _subscribe Observer.configure {|o|
      o.on_next(&args[0])
      o.on_error(&args[1])
      o.on_completed(&args[2])
    }
  else
    raise ArgumentError, "wrong number of arguments (#{args.size} for 0..1 or 3)"
  end
end
subscribe_on(scheduler) click to toggle source

Wraps the source sequence in order to run its subscription and unsubscribe logic on the specified scheduler.

# File lib/rx_ruby/operators/synchronization.rb, line 15
def subscribe_on(scheduler)
  raise ArgumentError.new 'Scheduler cannot be nil' unless scheduler

  AnonymousObservable.new do |observer|
    m = SingleAssignmentSubscription.new
    d = SerialSubscription.new
    d.subscription = m

    m.subscription = scheduler.schedule lambda {
      d.subscription = ScheduledSubscription.new scheduler, (subscribe observer)
    }

    d
  end
end
subscribe_on_completed(&block) click to toggle source

Subscribes the given block to the on_completed action of the observable sequence.

# File lib/rx_ruby/core/observable.rb, line 69
def subscribe_on_completed(&block)
  raise ArgumentError.new 'Block is required' unless block_given?
  subscribe(Observer.configure {|o| o.on_completed(&block) })
end
subscribe_on_error(&block) click to toggle source

Subscribes the given block to the on_error action of the observable sequence.

# File lib/rx_ruby/core/observable.rb, line 63
def subscribe_on_error(&block)
  raise ArgumentError.new 'Block is required' unless block_given?
  subscribe(Observer.configure {|o| o.on_error(&block) })
end
subscribe_on_next(&block) click to toggle source

Subscribes the given block to the on_next action of the observable sequence. @param [Object] block @return [Subscription]

# File lib/rx_ruby/core/observable.rb, line 57
def subscribe_on_next(&block)
  raise ArgumentError.new 'Block is required' unless block_given?
  subscribe(Observer.configure {|o| o.on_next(&block) })
end
sum(&block) click to toggle source

Computes the sum of a sequence of values. @param [Proc] block Optional block used to obtain the value to sum. @return [RxRuby::Observable] An observable sequence containing a single element with the sum of the values in the source sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 513
def sum(&block)
  return map(&block).sum if block_given?
  reduce(0) {|acc, x| acc + x}
end
synchronize(gate = Monitor.new) click to toggle source

Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.

# File lib/rx_ruby/operators/synchronization.rb, line 41
def synchronize(gate = Monitor.new)
  AnonymousObservable.new do |observer|
    subscribe(Observer.allow_reentrancy observer, gate)
  end
end
take(count, scheduler = ImmediateScheduler.instance) click to toggle source

Returns a specified number of contiguous elements from the start of an observable sequence.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 178
def take(count, scheduler = ImmediateScheduler.instance)
  return Observable.empty(scheduler) if count == 0

  AnonymousObservable.new do |observer|

    remaining = count

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        if remaining > 0
          remaining -= 1
          observer.on_next x
          observer.on_completed if remaining == 0
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end
end
take_last(count, scheduler = CurrentThreadScheduler.instance) click to toggle source

Returns a specified number of contiguous elements from the end of an observable sequence.

# File lib/rx_ruby/operators/single.rb, line 282
def take_last(count, scheduler = CurrentThreadScheduler.instance)
  raise ArgumentError.new 'Count cannot be less than zero' if count < 0
  AnonymousObservable.new do |observer|
    q = []
    g = CompositeSubscription.new

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        q.push x
        q.shift if q.length > 0
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        g.push(scheduler.schedule_recursive lambda {|this|
          if q.length > 0
            observer.on_next(q.shift)
            this.call
          else
            observer.on_completed
          end
        })
      end

      g.add(subscribe new_obs)
      g
    end
  end
end
take_last_buffer(count) click to toggle source

Returns a list with the specified number of contiguous elements from the end of an observable sequence.

# File lib/rx_ruby/operators/single.rb, line 314
def take_last_buffer(count)
  AnonymousObservable.new do |observer|
    q = []
    new_obs = Observer.configure do |o|
      o.on_next do |x|
        q.push x
        q.shift if q.length > count
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next q
        observer.on_completed
      end
    end

    susbcribe new_obs
  end
end
take_until(other) click to toggle source

Returns the elements from the source observable sequence until the other observable sequence produces an element.

# File lib/rx_ruby/operators/multiple.rb, line 396
def take_until(other)
  raise ArgumentError.new 'other cannot be nil' unless other

  AnonymousObservable.new do |observer| 
    source_subscription = SingleAssignmentSubscription.new
    other_subscription = SingleAssignmentSubscription.new

    gate = Monitor.new

    other_obs = Observer.configure do |o|
      o.on_next {|_| observer.on_completed }
      o.on_error(&observer.method(:on_error))
    end

    other_subscription.subscription = other.synchronize(gate).subscribe(other_obs)
    source_subscription.subscription = synchronize(gate).ensures(&other_subscription.method(:unsubscribe)).subscribe(observer)

    CompositeSubscription.new [source_subscription, other_subscription]
  end
end
take_while(&block) click to toggle source

Returns elements from an observable sequence as long as a specified condition is true.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 204
def take_while(&block)
  take_while_with_index {|x, _| block.call x }
end
take_while_with_index(&block) click to toggle source

Returns elements from an observable sequence as long as a specified condition is true. The element’s index is used in the logic of the predicate function.

# File lib/rx_ruby/operators/standard_query_operators.rb, line 210
def take_while_with_index(&block)
  AnonymousObservable.new do |observer|
    running = true
    i = 0

    new_observer = Observer.configure do |o|

      o.on_next do |x|
        if running
          begin
            running = block.call(x, i)
            i += 1
          rescue => e
            observer.on_error e
            next
          end

          if running
            observer.on_next x
          else
            observer.on_completed
          end
        end
      end

      o.on_error(&observer.method(:on_error))
      o.on_completed(&observer.method(:on_completed))
    end

    subscribe(new_observer)
  end      
end
tap(observer) click to toggle source

Invokes the observer’s methods for each message in the source sequence. This method can be used for debugging, logging, etc. of query behavior by intercepting the message stream to run arbitrary actions for messages on the pipeline.

# File lib/rx_ruby/operators/single.rb, line 75
def tap(observer)
  raise ArgumentError.new 'Observer cannot be nil' unless observer
  AnonymousObservable.new do |obs|
    new_obs = RxRuby::Observer.configure do |o|

      o.on_next do |value|
        begin
          observer.on_next value
        rescue => err
          obs.on_error err
        end

        obs.on_next value
      end

      o.on_error do |err|
        begin
          observer.on_error err
        rescue => e
          obs.on_error e
        end

        obs.on_error err
      end

      o.on_completed do
        begin
          observer.on_completed
        rescue => err
          obs.on_error err
        end

        obs.on_completed
      end

    end

    subscribe new_obs
  end
end
time_interval(scheduler = DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/time_interval.rb, line 3
def time_interval(scheduler = DefaultScheduler.instance)
  Observable.defer {
    last = scheduler.now
    self.map {|x|
      now = scheduler.now
      span = now - last
      last = now
      TimeInterval.new(span, x)
    }
  }
end
timestamp(scheduler = DefaultScheduler.instance) click to toggle source
# File lib/rx_ruby/linq/observable/timestamp.rb, line 3
def timestamp(scheduler = DefaultScheduler.instance)
  map do |x|
    { value: x, timestamp: scheduler.now }
  end
end
to_a() click to toggle source

Creates an array from an observable sequence. @return [RxRuby::Observable] An array created from an observable sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 520
def to_a
  AnonymousObservable.new do |observer|
    arr = []
    self.subscribe(
      arr.method(:push),
      observer.method(:on_error),
      lambda {
        observer.on_next arr
        observer.on_completed
      })
  end
end
to_h() { |h| ... } click to toggle source

Creates a Hash from the observable collection. Note that any duplicate keys will be overwritten. @return [RxRuby::Observable] A Hash created from an observable sequence.

# File lib/rx_ruby/operators/aggregates.rb, line 554
def to_h
  h = HashConfiguration.new
  yield h if block_given?
  reduce(Hash.new) do |acc, x|
    acc[h.key_selector_block.call x] = h.value_selector_block.call x
    acc
  end
end
window_with_count(count, skip) click to toggle source

Projects each element of an observable sequence into zero or more windows which are produced based on element count information.

# File lib/rx_ruby/operators/single.rb, line 336
def window_with_count(count, skip)
  raise ArgumentError.new 'Count must be greater than zero' if count <= 0
  raise ArgumentError.new 'Skip must be greater than zero' if skip <= 0

  AnonymousObservable.new do |observer|
    q = []
    n = 0

    m = SingleAssignmentSubscription.new
    ref_count_disposable = RefCountSubscription.new m

    create_window = lambda {
      s = Subject.new
      q.push s
      observer.on_next(s.add_ref(ref_count_disposable))
    }

    create_window.call

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        q.each {|s| s.on_next x}

        c = n - count + 1
        q.shift.on_completed if c >=0 && c % skip == 0

        n += 1
        create_window.call if n % skip == 0
      end

      o.on_error do |err|
        q.shift.on_error err while q.length > 0
        observer.on_error err
      end

      o.on_completed do
        q.shift.on_completed while q.length > 0
        observer.on_completed
      end
    end

    m.subscription = subscribe new_obs
    ref_count_disposable
  end
end
window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance) click to toggle source

Projects each element of an observable sequence into consecutive non-overlapping windows which are produced based on timing information.

# File lib/rx_ruby/operators/time.rb, line 30
def window_with_time(time_span, time_shift = time_span, scheduler = DefaultScheduler.instance)
  raise ArgumentError.new 'time_span must be greater than zero' if time_span <= 0
  raise ArgumentError.new 'time_span must be greater than zero' if time_shift <= 0

  AnonymousObservable.new do |observer|
    total_time = 0
    next_shift = time_shift
    next_span = time_span

    gate = Mutex.new
    q = []

    timer_d = SerialSubscription.new
    group_subscription = CompositeSubscription.new [timer_d]
    ref_count_subscription = RefCountSubscription.new(group_subscription)

    create_timer = lambda {
      m = SingleAssignmentSubscription.new
      timer_d.subscription = m

      is_span = false
      is_shift = false
      if next_span == next_shift
        is_span = true
        is_shift = true
      elsif next_span < next_shift
        is_span = true
      else
        is_shift = true
      end

      new_total_time = is_span ? next_span : next_shift
      ts = new_total_time - total_time
      total_time = new_total_time

      if is_span
        next_span += time_shift
      end
      if is_shift
        next_shift += time_shift
      end

      m.subscription = scheduler.schedule_relative(ts, lambda {
        gate.synchronize do
          if is_shift
            s = Subject.new
            q.push s
            observer.on_next(s.add_ref(ref_count_subscription))
          end
          if is_span
            s = q.shift
            s.on_completed
          end
          create_timer.call
        end
      })
    }

    q.push(Subject.new)
    observer.on_next(q[0].add_ref(ref_count_subscription))
    create_timer.call

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        gate.synchronize do
          q.each {|s| s.on_next x}
        end
      end

      o.on_error do |err|
        gate.synchronize do
          q.each {|s| s.on_error err}
          observer.on_error err
        end
      end

      o.on_completed do
        gate.synchronize do
          q.each {|s| s.on_on_completed}
          observer.on_completed
        end
      end
    end

    group_subscription.push subscribe(new_obs)

    ref_count_subscription
  end
end
zip(*args, &result_selector) click to toggle source

Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences have produced an element at a corresponding index.

# File lib/rx_ruby/operators/multiple.rb, line 418
def zip(*args, &result_selector)
  args.unshift(self)
  Observable.zip(*args, &result_selector)
end

Private Instance Methods

_concat_map(selector) click to toggle source
# File lib/rx_ruby/linq/observable/concat_map.rb, line 25
def _concat_map(selector)
  map_with_index {|x, i|
    result = selector.call(x, i)
    if result.respond_to?(:each)
      result = Observable.from(result)
    end
    result
  }.concat_all
end
delay_date(due_time, scheduler) click to toggle source
# File lib/rx_ruby/linq/observable/delay.rb, line 77
def delay_date(due_time, scheduler)
  delay_time_span(due_time - scheduler.now, scheduler)
end
delay_time_span(due_time, scheduler) click to toggle source
# File lib/rx_ruby/linq/observable/delay.rb, line 13
def delay_time_span(due_time, scheduler)
  AnonymousObservable.new do |observer|
    active = false
    cancelable = SerialSubscription.new
    exception = nil
    q = []
    running = false
    subscription = materialize.timestamp(scheduler).subscribe do |notification|
      if notification[:value].on_error?
        q = []
        q.push notification
        exception = notification[:value].error
        should_run = !running
      else
        q.push({ value: notification[:value], timestamp: notification[:timestamp] + due_time })
        should_run = !active
        active = true
      end

      if should_run
        if exception != nil
          observer.on_error exception
        else
          d = SingleAssignmentSubscription.new
          cancelable.subscription = d

          d.subscription = scheduler.schedule_recursive_relative(due_time, lambda {|this|
            return if exception != nil

            running = true
            begin
              result = nil
              if q.length > 0 && q[0][:timestamp] - scheduler.now <= 0
                result = q.shift[:value]
              end
              if result != nil
                result.accept observer
              end
            end while result != nil

            should_recurse = false
            recurse_due_time = 0
            if q.length > 0
              should_recurse = true
              recurse_due_time = [0, q[0][:timestamp] - scheduler.now].max
            else
              active = false
            end
            e = exception
            running = false
            if e != nil
              observer.on_error e
            elsif should_recurse
              this.call recurse_due_time
            end
          })
        end
      end
    end

    CompositeSubscription.new [subscription, cancelable]
  end
end
extrema_by(is_min = false, &block) click to toggle source
# File lib/rx_ruby/operators/aggregates.rb, line 565
def extrema_by(is_min = false, &block)
  AnonymousObservable.new do |observer|
    has_value = false
    last_key = nil
    list = []

    new_obs = Observer.configure do |o|
      o.on_next do |x|
        key = nil
        begin
          key = block.call(x)
        rescue => e
          observer.on_error e
          return
        end

        comparison = 0
        if has_value
          comparison = key<=>last_key
          comparison = comparison * -1 if is_min
        else
          has_value = true
          last_key = key
        end

        if comparison > 0
          last_key = key
          list = []
        end
        list.push x if comparison >= 0
      end

      o.on_error(&observer.method(:on_error))

      o.on_completed do
        observer.on_next list
        observer.on_completed
      end

    end

    subscribe new_obs
  end
end
schedule_subscribe(_, auto_detach_observer) click to toggle source
# File lib/rx_ruby/core/observable.rb, line 76
def schedule_subscribe(_, auto_detach_observer)
  begin
    auto_detach_observer.subscription = subscribe_core auto_detach_observer
  rescue => e
    raise e unless auto_detach_observer.fail e
  end

  Subscription.empty
end