module RxRuby::Observer
Module for all Observers
Public Class Methods
Synchronizes access to the observer such that its callback methods cannot be called concurrently by multiple threads, using the specified gate object for use by a Monitor based lock. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common gate object if given. Notice reentrant observer callbacks on the same thread are still possible.
# File lib/rx_ruby/core/synchronized_observer.rb, line 14 def allow_reentrancy(observer, gate = Monitor.new) SynchronizedObserver.new(observer, gate) end
Configures a new instance of an Observer
# File lib/rx_ruby/core/observer.rb, line 53 def configure config = ObserverConfiguration.new yield config if block_given? ObserverBase.new config end
# File lib/rx_ruby/core/observer.rb, line 59 def create(on_next = nil, on_error = nil, on_completed = nil) configure do |o| o.on_next(&on_next) if on_next o.on_error(&on_error) if on_error o.on_completed(&on_completed) if on_completed end end
Creates an observer from a notification callback.
# File lib/rx_ruby/core/notification.rb, line 12 def from_notifier raise ArgumentError.new 'Block required' unless block_given? configure do |o| o.on_next {|x| yield Notification.create_on_next(x) } o.on_error {|err| yield Notification.create_on_error(err) } o.on_completed { yield Notification.create_on_completed } end end
Synchronizes access to the observer such that its callback methods cannot be called concurrently, using the specified asynchronous lock to protect against concurrent and reentrant access. This overload is useful when coordinating multiple observers that access shared state by synchronizing on a common asynchronous lock.
# File lib/rx_ruby/core/async_lock_observer.rb, line 13 def prevent_reentrancy(observer, gate = AsyncLock.new) AsyncLockObserver.new(observer, gate) end
Public Instance Methods
Hides the identity of an observer.
# File lib/rx_ruby/core/observer.rb, line 37 def as_observer Observer.configure do |o| o.on_next(&method(:on_next)) o.on_error(&method(:on_error)) o.on_completed(&method(:on_completed)) end end
Checks access to the observer for grammar violations. This includes checking for multiple on_error or on_completed calls, as well as reentrancy in any of the observer methods. If a violation is detected, an error is thrown from the offending observer method call.
# File lib/rx_ruby/core/checked_observer.rb, line 11 def checked CheckedObserver.new(self) end
Schedules the invocation of observer methods on the given scheduler.
# File lib/rx_ruby/core/observe_on_observer.rb, line 10 def notify_on(scheduler) ObserveOnObserver.new(scheduler, self, nil) end
Creates a notification callback from an observer.
# File lib/rx_ruby/core/observer.rb, line 46 def to_notifier lambda {|n| n.accept self} end