module RxRuby::Observer

Module for all Observers

Public Class Methods

allow_reentrancy(observer, gate = Monitor.new) click to toggle source

Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a Monitor based lock. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object if given. Notice reentrant observer callbacks on the same thread are still possible.

# File lib/rx_ruby/core/synchronized_observer.rb, line 14
def allow_reentrancy(observer, gate = Monitor.new)
  SynchronizedObserver.new(observer, gate)
end
configure() { |config| ... } click to toggle source

Configures a new instance of an Observer

# File lib/rx_ruby/core/observer.rb, line 53
def configure
  config = ObserverConfiguration.new
  yield config if block_given?
  ObserverBase.new config
end
create(on_next = nil, on_error = nil, on_completed = nil) click to toggle source
# File lib/rx_ruby/core/observer.rb, line 59
def create(on_next = nil, on_error = nil, on_completed = nil)
  configure do |o|
    o.on_next(&on_next) if on_next
    o.on_error(&on_error) if on_error
    o.on_completed(&on_completed) if on_completed
  end
end
from_notifier() { |create_on_next| ... } click to toggle source

Creates an observer from a notification callback.

# File lib/rx_ruby/core/notification.rb, line 12
def from_notifier
  raise ArgumentError.new 'Block required' unless block_given?

  configure do |o|
    o.on_next      {|x| yield Notification.create_on_next(x) }
    o.on_error     {|err| yield Notification.create_on_error(err) }
    o.on_completed { yield Notification.create_on_completed }
  end
end
prevent_reentrancy(observer, gate = AsyncLock.new) click to toggle source

Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock.

# File lib/rx_ruby/core/async_lock_observer.rb, line 13
def prevent_reentrancy(observer, gate = AsyncLock.new)
  AsyncLockObserver.new(observer, gate)
end

Public Instance Methods

as_observer() click to toggle source

Hides the identity of an observer.

# File lib/rx_ruby/core/observer.rb, line 37
def as_observer
  Observer.configure do |o|
    o.on_next(&method(:on_next))
    o.on_error(&method(:on_error))
    o.on_completed(&method(:on_completed))
  end      
end
checked() click to toggle source

Checks access to the observer for grammar violations. This includes checking for multiple on_error or on_completed calls, as well as reentrancy in any of the observer methods. If a violation is detected, an error is thrown from the offending observer method call.

# File lib/rx_ruby/core/checked_observer.rb, line 11
def checked
  CheckedObserver.new(self)
end
notify_on(scheduler) click to toggle source

Schedules the invocation of observer methods on the given scheduler.

# File lib/rx_ruby/core/observe_on_observer.rb, line 10
def notify_on(scheduler)
  ObserveOnObserver.new(scheduler, self, nil)
end
to_notifier() click to toggle source

Creates a notification callback from an observer.

# File lib/rx_ruby/core/observer.rb, line 46
def to_notifier
  lambda {|n| n.accept self}
end