class Concurrent::Promises::AbstractEventFuture
Common ancestor of {Event} and {Future} classes, many shared methods are defined here.
Public Class Methods
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 520 def initialize(promise, default_executor) super() @Lock = Mutex.new @Condition = ConditionVariable.new @Promise = promise @DefaultExecutor = default_executor @Callbacks = LockFreeStack.new @Waiters = AtomicFixnum.new 0 self.internal_state = PENDING end
Public Instance Methods
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 736 def add_callback_clear_delayed_node(node) add_callback(:callback_clear_delayed_node, node) end
@!visibility private
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 731 def add_callback_notify_blocked(promise, index) add_callback :callback_notify_blocked, promise, index end
@!visibility private
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 700 def blocks @Callbacks.each_with_object([]) do |(method, args), promises| promises.push(args[0]) if method == :callback_notify_blocked end end
For inspection. @!visibility private @return [Array<AbstractPromise>]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 708 def callbacks @Callbacks.each.to_a end
For inspection. @!visibility private
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 594 def chain(*args, &task) chain_on @DefaultExecutor, *args, &task end
@!macro promises.shortcut.on @return [Future]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 612 def chain_on(executor, *args, &task) ChainPromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future end
Chains the task to be executed asynchronously on executor after it is resolved.
@!macro promises.param.executor @!macro promises.param.args @return [Future] @!macro promise.param.task-future
@overload an_event.chain_on(executor, *args, &task)
@yield [*args] to the task.
@overload a_future.chain_on(executor, *args, &task)
@yield [fulfilled, value, reason, *args] to the task. @yieldparam [true, false] fulfilled @yieldparam [Object] value @yieldparam [Object] reason
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 627 def chain_resolvable(resolvable) on_resolution! { resolvable.resolve_with internal_state } end
Resolves the resolvable when receiver is resolved.
@param [Resolvable] resolvable @return [self]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 588 def default_executor @DefaultExecutor end
Returns default executor. @return [Executor] default executor @see with_default_executor
@see FactoryMethods#future_on
@see FactoryMethods#resolvable_future
@see FactoryMethods#any_fulfilled_future_on
@see similar
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 635 def on_resolution(*args, &callback) on_resolution_using @DefaultExecutor, *args, &callback end
@!macro promises.shortcut.using @return [self]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 653 def on_resolution!(*args, &callback) add_callback :callback_on_resolution, args, callback end
Stores the callback to be executed synchronously on resolving thread after it is resolved.
@!macro promises.param.args @!macro promise.param.callback @return [self]
@overload an_event.on_resolution!(*args, &callback)
@yield [*args] to the callback.
@overload a_future.on_resolution!(*args, &callback)
@yield [fulfilled, value, reason, *args] to the callback. @yieldparam [true, false] fulfilled @yieldparam [Object] value @yieldparam [Object] reason
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 671 def on_resolution_using(executor, *args, &callback) add_callback :async_callback_on_resolution, executor, args, callback end
Stores the callback to be executed asynchronously on executor after it is resolved.
@!macro promises.param.executor @!macro promises.param.args @!macro promise.param.callback @return [self]
@overload an_event.on_resolution_using(executor, *args, &callback)
@yield [*args] to the callback.
@overload a_future.on_resolution_using(executor, *args, &callback)
@yield [fulfilled, value, reason, *args] to the callback. @yieldparam [true, false] fulfilled @yieldparam [Object] value @yieldparam [Object] reason
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 547 def pending? !internal_state.resolved? end
Is it in pending state? @return [Boolean]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 714 def promise @Promise end
For inspection. @!visibility private
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 686 def resolve_with(state, raise_on_reassign = true, reserved = false) if compare_and_set_internal_state(reserved ? RESERVED : PENDING, state) # go to synchronized block only if there were waiting threads @Lock.synchronize { @Condition.broadcast } unless @Waiters.value == 0 call_callbacks state else return rejected_resolution(raise_on_reassign, state) end self end
@!visibility private
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 553 def resolved? internal_state.resolved? end
Is it in resolved state? @return [Boolean]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 541 def state internal_state.to_sym end
Returns its state. @return [Symbol]
@overload an_event.state
@return [:pending, :resolved]
@overload a_future.state
Both :fulfilled, :rejected implies :resolved. @return [:pending, :fulfilled, :rejected]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 617 def to_s format '%s %s>', super[0..-2], state end
@return [String] Short string representation.
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 560 def touch @Promise.touch self end
Propagates touch. Requests all the delayed futures, which it depends on, to be executed. This method is called by any other method requiring resolved state, like {#wait}. @return [self]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 720 def touched? promise.touched? end
For inspection. @!visibility private
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 576 def wait(timeout = nil) result = wait_until_resolved(timeout) timeout ? result : self end
@!macro promises.method.wait
Wait (block the Thread) until receiver is {#resolved?}. @!macro promises.touches @!macro promises.warn.blocks @!macro promises.param.timeout @return [self, true, false] self implies timeout was not used, true implies timeout was used and it was resolved, false implies it was not resolved within timeout.
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 726 def waiting_threads @Waiters.each.to_a end
For inspection. @!visibility private
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 681 def with_default_executor(executor) raise NotImplementedError end
@!macro promises.method.with_default_executor
Crates new object with same class with the executor set as its new default executor. Any futures depending on it will use the new default executor.
@!macro promises.shortcut.event-future @abstract @return [AbstractEventFuture]
Private Instance Methods
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 748 def add_callback(method, *args) state = internal_state if state.resolved? call_callback method, state, args else @Callbacks.push [method, args] state = internal_state # take back if it was resolved in the meanwhile call_callbacks state if state.resolved? end self end
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 801 def async_callback_on_resolution(state, executor, args, callback) with_async(executor, state, args, callback) do |st, ar, cb| callback_on_resolution st, ar, cb end end
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 785 def call_callback(method, state, args) self.send method, state, *args end
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 789 def call_callbacks(state) method, args = @Callbacks.pop while method call_callback method, state, args method, args = @Callbacks.pop end end
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 761 def callback_clear_delayed_node(state, node) node.value = nil end
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 807 def callback_notify_blocked(state, promise, index) promise.on_blocker_resolution self, index end
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 766 def wait_until_resolved(timeout) return true if resolved? touch @Lock.synchronize do @Waiters.increment begin unless resolved? @Condition.wait @Lock, timeout end ensure # JRuby may raise ConcurrencyError @Waiters.decrement end end resolved? end
@return [Boolean]
Source
# File lib/concurrent-ruby/concurrent/promises.rb, line 797 def with_async(executor, *args, &block) Concurrent.executor(executor).post(*args, &block) end