class RxRuby::ConnectableObservable

Public Class Methods

new(source, subject) click to toggle source
Calls superclass method RxRuby::AnonymousObservable::new
# File lib/rx_ruby/linq/connectable_observable.rb, line 3
def initialize(source, subject)
  @has_subscription = false
  @subscription = nil
  @source_observable = source.as_observable
  @subject = subject

  super(&subject.method(:subscribe))
end

Public Instance Methods

connect() click to toggle source
# File lib/rx_ruby/linq/connectable_observable.rb, line 12
def connect
  unless @has_subscription
    @has_subscription = true
    @subscription = CompositeSubscription.new [@source_observable.subscribe(@subject), Subscription.create { @has_subscription = false }]
  end
  @subscription
end
ref_count() click to toggle source
# File lib/rx_ruby/linq/connectable_observable.rb, line 20
def ref_count
  count = 0
  AnonymousObservable.new do |observer|
    count += 1
    should_connect = true if count == 1
    connectable_subscription = self.connect if should_connect
    Subscription.create {
      @subscription.unsubscribe
      count -= 1
      connectable_subscription.unsubscribe if count == 0
    }
  end
end