module Concurrent
{include:file:README.md}
@!macro thread_safe_variable_comparison
## Thread-safe Variable Classes Each of the thread-safe variable classes is designed to solve a different problem. In general: * *{Concurrent::Agent}:* Shared, mutable variable providing independent, uncoordinated, *asynchronous* change of individual values. Best used when the value will undergo frequent, complex updates. Suitable when the result of an update does not need to be known immediately. * *{Concurrent::Atom}:* Shared, mutable variable providing independent, uncoordinated, *synchronous* change of individual values. Best used when the value will undergo frequent reads but only occasional, though complex, updates. Suitable when the result of an update must be known immediately. * *{Concurrent::AtomicReference}:* A simple object reference that can be updated atomically. Updates are synchronous but fast. Best used when updates a simple set operations. Not suitable when updates are complex. {Concurrent::AtomicBoolean} and {Concurrent::AtomicFixnum} are similar but optimized for the given data type. * *{Concurrent::Exchanger}:* Shared, stateless synchronization point. Used when two or more threads need to exchange data. The threads will pair then block on each other until the exchange is complete. * *{Concurrent::MVar}:* Shared synchronization point. Used when one thread must give a value to another, which must take the value. The threads will block on each other until the exchange is complete. * *{Concurrent::ThreadLocalVar}:* Shared, mutable, isolated variable which holds a different value for each thread which has access. Often used as an instance variable in objects which must maintain different state for different threads. * *{Concurrent::TVar}:* Shared, mutable variables which provide *coordinated*, *synchronous*, change of *many* stated. Used when multiple value must change together, in an all-or-nothing transaction.
TODO (pitr-ch 14-Mar-2017): deprecate, Future
, Promise
, etc.
Constants
- ArrayImplementation
-
@!macro internal_implementation_note
- AtomicBooleanImplementation
-
@!visibility private @!macro internal_implementation_note
- AtomicFixnumImplementation
-
@!visibility private @!macro internal_implementation_note
- AtomicReferenceImplementation
-
@!macro internal_implementation_note
- CancelledOperationError
-
Raised when an asynchronous operation is cancelled before execution.
- ConfigurationError
-
Raised when errors occur during configuration.
- CountDownLatchImplementation
-
@!visibility private @!macro internal_implementation_note
- Error
- ExchangerImplementation
-
@!visibility private @!macro internal_implementation_note
- GLOBAL_FAST_EXECUTOR
-
@!visibility private
- GLOBAL_IMMEDIATE_EXECUTOR
-
@!visibility private
- GLOBAL_IO_EXECUTOR
-
@!visibility private
- GLOBAL_LOGGER
-
@!visibility private
- GLOBAL_MONOTONIC_CLOCK
-
Clock that cannot be set and represents monotonic time since some unspecified starting point.
@!visibility private
- GLOBAL_TIMER_SET
-
@!visibility private
- HashImplementation
-
@!macro internal_implementation_note
- IllegalOperationError
-
Raised when an operation is attempted which is not legal given the receiver’s current state
- ImmutabilityError
-
Raised when an attempt is made to violate an immutability guarantee.
- InitializationError
-
Raised when an object’s methods are called when it has not been properly initialized.
- LifecycleError
-
Raised when a lifecycle method (such as ‘stop`) is called in an improper sequence or when the object is in an inappropriate state.
- MaxRestartFrequencyError
-
Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.
- NULL
-
Various classes within allows for
nil
values to be stored, so a specialNULL
token is required to indicate the “nil-ness”. @!visibility private - NULL_LOGGER
-
Suppresses all output when used for logging.
- PromiseExecutionError
- RejectedExecutionError
-
Raised by an ‘Executor` when it is unable to process a given task, possibly because of a reject policy or other internal error.
- ResourceLimitError
-
Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.
- SemaphoreImplementation
-
@!visibility private @!macro internal_implementation_note
- SetImplementation
-
@!macro internal_implementation_note
- SingleThreadExecutorImplementation
- ThreadLocalVarImplementation
-
@!visibility private @!macro internal_implementation_note
- ThreadPoolExecutorImplementation
- TimeoutError
-
Raised when an operation times out.
- VERSION
Public Class Methods
Source
# File lib/concurrent-ruby/concurrent/tvar.rb, line 150 def abort_transaction raise Transaction::AbortError.new end
Abort a currently running transaction - see ‘Concurrent::atomically`.
Source
# File lib/concurrent-ruby/concurrent/tvar.rb, line 93 def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction transaction = Transaction::current # Are we not already in a transaction (not nested)? if transaction.nil? # New transaction begin # Retry loop loop do # Create a new transaction transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue Transaction::LeaveError => e transaction.abort break result rescue => e transaction.abort raise e end # If we can commit, break out of the loop if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction Transaction::current = nil end else # Nested transaction - flatten it and just run the block yield end end
Run a block that reads and writes ‘TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.
There are some very important and unusual semantics that you must be aware of:
-
Most importantly, the block that you pass to atomically may be executed
more than once. In most cases your code should be free of side-effects, except for via TVar.
-
If an exception escapes an atomically block it will abort the transaction.
-
It is undefined behaviour to use callcc or Fiber with atomically.
-
If you create a new thread within an atomically, it will not be part of
the transaction. Creating a thread counts as a side-effect.
Transactions within transactions are flattened to a single transaction.
@example
a = new TVar(100_000) b = new TVar(100) Concurrent::atomically do a.value -= 10 b.value += 10 end
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 56 def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? unless inputs.all? { |input| input.is_a? IVar } raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") end result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 20 def self.create_simple_logger(level = Logger::FATAL, output = $stderr) # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking lambda do |severity, progname, message = nil, &block| return false if severity < level message = block ? block.call : message formatted_message = case message when String message when Exception format "%s (%s)\n%s", message.message, message.class, (message.backtrace || []).join("\n") else message.inspect end output.print format "[%s] %5s -- %s: %s\n", Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'), Logger::SEV_LABEL[severity], progname, formatted_message true end end
@return [Logger] Logger with provided level and output.
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 52 def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr) logger = Logger.new(output) logger.level = level logger.formatter = lambda do |severity, datetime, progname, msg| formatted_message = case msg when String msg when Exception format "%s (%s)\n%s", msg.message, msg.class, (msg.backtrace || []).join("\n") else msg.inspect end format "[%s] %5s -- %s: %s\n", datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), severity, progname, formatted_message end lambda do |loglevel, progname, message = nil, &block| logger.add loglevel, message, progname, &block end end
@return [Logger] Logger with provided level and output. @deprecated
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 34 def dataflow(*inputs, &block) dataflow_with(Concurrent.global_io_executor, *inputs, &block) end
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. {include:file:docs-source/dataflow.md}
@param [Future] inputs zero or more ‘Future` operations that this dataflow depends upon
@yield The operation to perform once all the dependencies are met @yieldparam [Future] inputs each of the ‘Future` inputs to the dataflow @yieldreturn [Object] the result of the block operation
@return [Object] the result of all the operations
@raise [ArgumentError] if no block is given @raise [ArgumentError] if any of the inputs are not ‘IVar`s
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 44 def dataflow!(*inputs, &block) dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 39 def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 49 def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 131 def self.disable_at_exit_handlers! deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841." end
Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer’s responsibility to ensure that the handlers are shutdown properly prior to application exit by calling ‘AtExit.run` method.
@note this option should be needed only because of ‘at_exit` ordering
issues which may arise when running some of the testing frameworks. E.g. Minitest's test-suite runs itself in `at_exit` callback which executes after the pools are already terminated. Then auto termination needs to be disabled and called manually after test-suite ends.
@note This method should never be called
from within a gem. It should *only* be used from within the main application and even then it should be used only when necessary.
@deprecated Has no effect since it is no longer needed, see github.com/ruby-concurrency/concurrent-ruby/pull/841.
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 166 def self.executor(executor_identifier) Options.executor(executor_identifier) end
General access point to global executors. @param [Symbol, Executor] executor_identifier symbols:
- :fast - {Concurrent.global_fast_executor} - :io - {Concurrent.global_io_executor} - :immediate - {Concurrent.global_immediate_executor}
@return [Executor]
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 138 def self.global_fast_executor GLOBAL_FAST_EXECUTOR.value end
Global thread pool optimized for short, fast operations.
@return [ThreadPoolExecutor] the thread pool
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 149 def self.global_immediate_executor GLOBAL_IMMEDIATE_EXECUTOR end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 145 def self.global_io_executor GLOBAL_IO_EXECUTOR.value end
Global thread pool optimized for long, blocking (IO) tasks.
@return [ThreadPoolExecutor] the thread pool
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 92 def self.global_logger GLOBAL_LOGGER.value end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 96 def self.global_logger=(value) GLOBAL_LOGGER.value = value end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 156 def self.global_timer_set GLOBAL_TIMER_SET.value end
Global thread pool user for global timers.
@return [Concurrent::TimerSet] the thread pool
Source
# File lib/concurrent-ruby/concurrent/tvar.rb, line 155 def leave_transaction raise Transaction::LeaveError.new end
Leave a transaction without committing or aborting - see ‘Concurrent::atomically`.
Source
# File lib/concurrent-ruby/concurrent/utility/monotonic_time.rb, line 53 def monotonic_time GLOBAL_MONOTONIC_CLOCK.get_time end
@!macro monotonic_get_time
Returns the current time a tracked by the application monotonic clock. @return [Float] The current monotonic time since some unspecified starting point @!macro monotonic_clock_warning
Source
# File lib/concurrent-ruby/concurrent/utility/monotonic_time.rb, line 6 def initialize @last_time = Time.now.to_f super() end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 170 def self.new_fast_executor(opts = {}) FixedThreadPool.new( [2, Concurrent.processor_count].max, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :abort, # shouldn't matter -- 0 max queue name: "fast" ) end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 181 def self.new_io_executor(opts = {}) CachedThreadPool.new( auto_terminate: opts.fetch(:auto_terminate, true), fallback_policy: :abort, # shouldn't matter -- 0 max queue name: "io" ) end
Source
# File lib/concurrent-ruby/concurrent/utility/processor_counter.rb, line 160 def self.physical_processor_count processor_counter.physical_processor_count end
Source
# File lib/concurrent-ruby/concurrent/utility/processor_counter.rb, line 156 def self.processor_count processor_counter.processor_count end
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 46 def self.use_simple_logger(level = Logger::FATAL, output = $stderr) Concurrent.global_logger = create_simple_logger level, output end
Use logger created by create_simple_logger to log concurrent-ruby messages.
Source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 79 def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr) Concurrent.global_logger = create_stdlib_logger level, output end
Use logger created by create_stdlib_logger to log concurrent-ruby messages. @deprecated
Public Instance Methods
Source
# File lib/concurrent-ruby/concurrent/utility/monotonic_time.rb, line 13 def get_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
@!visibility private
Private Instance Methods
Source
# File lib/concurrent-ruby/concurrent/tvar.rb, line 150 def abort_transaction raise Transaction::AbortError.new end
Abort a currently running transaction - see ‘Concurrent::atomically`.
Source
# File lib/concurrent-ruby/concurrent/tvar.rb, line 93 def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction transaction = Transaction::current # Are we not already in a transaction (not nested)? if transaction.nil? # New transaction begin # Retry loop loop do # Create a new transaction transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue Transaction::LeaveError => e transaction.abort break result rescue => e transaction.abort raise e end # If we can commit, break out of the loop if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction Transaction::current = nil end else # Nested transaction - flatten it and just run the block yield end end
Run a block that reads and writes ‘TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.
There are some very important and unusual semantics that you must be aware of:
-
Most importantly, the block that you pass to atomically may be executed
more than once. In most cases your code should be free of side-effects, except for via TVar.
-
If an exception escapes an atomically block it will abort the transaction.
-
It is undefined behaviour to use callcc or Fiber with atomically.
-
If you create a new thread within an atomically, it will not be part of
the transaction. Creating a thread counts as a side-effect.
Transactions within transactions are flattened to a single transaction.
@example
a = new TVar(100_000) b = new TVar(100) Concurrent::atomically do a.value -= 10 b.value += 10 end
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 56 def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? unless inputs.all? { |input| input.is_a? IVar } raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") end result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 34 def dataflow(*inputs, &block) dataflow_with(Concurrent.global_io_executor, *inputs, &block) end
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. {include:file:docs-source/dataflow.md}
@param [Future] inputs zero or more ‘Future` operations that this dataflow depends upon
@yield The operation to perform once all the dependencies are met @yieldparam [Future] inputs each of the ‘Future` inputs to the dataflow @yieldreturn [Object] the result of the block operation
@return [Object] the result of all the operations
@raise [ArgumentError] if no block is given @raise [ArgumentError] if any of the inputs are not ‘IVar`s
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 44 def dataflow!(*inputs, &block) dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 39 def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end
Source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 49 def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end
Source
# File lib/concurrent-ruby/concurrent/tvar.rb, line 155 def leave_transaction raise Transaction::LeaveError.new end
Leave a transaction without committing or aborting - see ‘Concurrent::atomically`.
Source
# File lib/concurrent-ruby/concurrent/utility/monotonic_time.rb, line 53 def monotonic_time GLOBAL_MONOTONIC_CLOCK.get_time end
@!macro monotonic_get_time
Returns the current time a tracked by the application monotonic clock. @return [Float] The current monotonic time since some unspecified starting point @!macro monotonic_clock_warning