module Concurrent

{include:file:README.md}

@!macro thread_safe_variable_comparison

## Thread-safe Variable Classes

Each of the thread-safe variable classes is designed to solve a different
problem. In general:

* *{Concurrent::Agent}:* Shared, mutable variable providing independent,
  uncoordinated, *asynchronous* change of individual values. Best used when
  the value will undergo frequent, complex updates. Suitable when the result
  of an update does not need to be known immediately.
* *{Concurrent::Atom}:* Shared, mutable variable providing independent,
  uncoordinated, *synchronous* change of individual values. Best used when
  the value will undergo frequent reads but only occasional, though complex,
  updates. Suitable when the result of an update must be known immediately.
* *{Concurrent::AtomicReference}:* A simple object reference that can be updated
  atomically. Updates are synchronous but fast. Best used when updates a
  simple set operations. Not suitable when updates are complex.
  {Concurrent::AtomicBoolean} and {Concurrent::AtomicFixnum} are similar
  but optimized for the given data type.
* *{Concurrent::Exchanger}:* Shared, stateless synchronization point. Used
  when two or more threads need to exchange data. The threads will pair then
  block on each other until the exchange is complete.
* *{Concurrent::MVar}:* Shared synchronization point. Used when one thread
  must give a value to another, which must take the value. The threads will
  block on each other until the exchange is complete.
* *{Concurrent::ThreadLocalVar}:* Shared, mutable, isolated variable which
  holds a different value for each thread which has access. Often used as
  an instance variable in objects which must maintain different state
  for different threads.
* *{Concurrent::TVar}:* Shared, mutable variables which provide
  *coordinated*, *synchronous*, change of *many* stated. Used when multiple
  value must change together, in an all-or-nothing transaction.

TODO (pitr-ch 14-Mar-2017): deprecate, Future, Promise, etc.

Constants

ArrayImplementation

@!macro internal_implementation_note

AtomicBooleanImplementation

@!visibility private @!macro internal_implementation_note

AtomicFixnumImplementation

@!visibility private @!macro internal_implementation_note

AtomicReferenceImplementation

@!macro internal_implementation_note

CancelledOperationError

Raised when an asynchronous operation is cancelled before execution.

ConfigurationError

Raised when errors occur during configuration.

CountDownLatchImplementation

@!visibility private @!macro internal_implementation_note

Error
ExchangerImplementation

@!visibility private @!macro internal_implementation_note

GLOBAL_FAST_EXECUTOR

@!visibility private

GLOBAL_IMMEDIATE_EXECUTOR

@!visibility private

GLOBAL_IO_EXECUTOR

@!visibility private

GLOBAL_LOGGER

@!visibility private

GLOBAL_TIMER_SET

@!visibility private

HashImplementation

@!macro internal_implementation_note

IllegalOperationError

Raised when an operation is attempted which is not legal given the receiver’s current state

ImmutabilityError

Raised when an attempt is made to violate an immutability guarantee.

InitializationError

Raised when an object’s methods are called when it has not been properly initialized.

LifecycleError

Raised when a lifecycle method (such as ‘stop`) is called in an improper sequence or when the object is in an inappropriate state.

LockLocalVar

A ‘FiberLocalVar` is a variable where the value is different for each fiber. Each variable may have a default value, but when you modify the variable only the current fiber will ever see that change.

This is similar to Ruby’s built-in fiber-local variables (‘Thread.current`), but with these major advantages:

  • ‘FiberLocalVar` has its own identity, it doesn’t need a Symbol.

  • Each Ruby’s built-in fiber-local variable leaks some memory forever (it’s a Symbol held forever on the fiber), so it’s only OK to create a small amount of them. ‘FiberLocalVar` has no such issue and it is fine to create many of them.

  • Ruby’s built-in fiber-local variables leak forever the value set on each fiber (unless set to nil explicitly). ‘FiberLocalVar` automatically removes the mapping for each fiber once the `FiberLocalVar` instance is GC’d.

@example

v = FiberLocalVar.new(14)
v.value #=> 14
v.value = 2
v.value #=> 2

@example

v = FiberLocalVar.new(14)

Fiber.new do
  v.value #=> 14
  v.value = 1
  v.value #=> 1
end.resume

Fiber.new do
  v.value #=> 14
  v.value = 2
  v.value #=> 2
end.resume

v.value #=> 14
MaxRestartFrequencyError

Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.

NULL

Various classes within allows for nil values to be stored, so a special NULL token is required to indicate the “nil-ness”. @!visibility private

NULL_LOGGER

Suppresses all output when used for logging.

PromiseExecutionError
RejectedExecutionError

Raised by an ‘Executor` when it is unable to process a given task, possibly because of a reject policy or other internal error.

ResourceLimitError

Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.

SemaphoreImplementation

@!visibility private @!macro internal_implementation_note

SetImplementation

@!macro internal_implementation_note

SingleThreadExecutorImplementation
ThreadPoolExecutorImplementation
TimeoutError

Raised when an operation times out.

VERSION

Public Class Methods

abort_transaction() click to toggle source

Abort a currently running transaction - see ‘Concurrent::atomically`.

# File lib/concurrent-ruby/concurrent/tvar.rb, line 139
def abort_transaction
  raise Transaction::AbortError.new
end
atomically() { || ... } click to toggle source

Run a block that reads and writes ‘TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed

    more than once. In most cases your code should be free of
    side-effects, except for via TVar.
  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of

    the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

@example

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end
# File lib/concurrent-ruby/concurrent/tvar.rb, line 82
def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop

      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue Transaction::LeaveError => e
          transaction.abort
          break result
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end
call_dataflow(method, executor, *inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 56
def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  unless inputs.all? { |input| input.is_a? IVar }
    raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }")
  end

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end
create_simple_logger(level = Logger::FATAL, output = $stderr) click to toggle source

@return [Logger] Logger with provided level and output.

# File lib/concurrent-ruby/concurrent/concern/logging.rb, line 37
def self.create_simple_logger(level = Logger::FATAL, output = $stderr)
  # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking
  lambda do |severity, progname, message = nil, &block|
    return false if severity < level

    message           = block ? block.call : message
    formatted_message = case message
                        when String
                          message
                        when Exception
                          format "%s (%s)\n%s",
                                 message.message, message.class, (message.backtrace || []).join("\n")
                        else
                          message.inspect
                        end

    output.print format "[%s] %5s -- %s: %s\n",
                        Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'),
                        Logger::SEV_LABEL[severity],
                        progname,
                        formatted_message
    true
  end
end
create_stdlib_logger(level = Logger::FATAL, output = $stderr) click to toggle source

@return [Logger] Logger with provided level and output. @deprecated

# File lib/concurrent-ruby/concurrent/concern/logging.rb, line 69
def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr)
  logger           = Logger.new(output)
  logger.level     = level
  logger.formatter = lambda do |severity, datetime, progname, msg|
    formatted_message = case msg
                        when String
                          msg
                        when Exception
                          format "%s (%s)\n%s",
                                 msg.message, msg.class, (msg.backtrace || []).join("\n")
                        else
                          msg.inspect
                        end
    format "[%s] %5s -- %s: %s\n",
           datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
           severity,
           progname,
           formatted_message
  end

  lambda do |loglevel, progname, message = nil, &block|
    logger.add loglevel, message, progname, &block
  end
end
dataflow(*inputs, &block) click to toggle source

Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. {include:file:docs-source/dataflow.md}

@param [Future] inputs zero or more ‘Future` operations that this dataflow depends upon

@yield The operation to perform once all the dependencies are met @yieldparam [Future] inputs each of the ‘Future` inputs to the dataflow @yieldreturn [Object] the result of the block operation

@return [Object] the result of all the operations

@raise [ArgumentError] if no block is given @raise [ArgumentError] if any of the inputs are not ‘IVar`s

# File lib/concurrent-ruby/concurrent/dataflow.rb, line 34
def dataflow(*inputs, &block)
  dataflow_with(Concurrent.global_io_executor, *inputs, &block)
end
dataflow!(*inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 44
def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.global_io_executor, *inputs, &block)
end
dataflow_with(executor, *inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 39
def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end
dataflow_with!(executor, *inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 49
def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end
disable_at_exit_handlers!() click to toggle source

Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer’s responsibility to ensure that the handlers are shutdown properly prior to application exit by calling ‘AtExit.run` method.

@note this option should be needed only because of ‘at_exit` ordering

issues which may arise when running some of the testing frameworks.
E.g. Minitest's test-suite runs itself in `at_exit` callback which
executes after the pools are already terminated. Then auto termination
needs to be disabled and called manually after test-suite ends.

@note This method should never be called

from within a gem. It should *only* be used from within the main
application and even then it should be used only when necessary.

@deprecated Has no effect since it is no longer needed, see github.com/ruby-concurrency/concurrent-ruby/pull/841.

# File lib/concurrent-ruby/concurrent/configuration.rb, line 48
def self.disable_at_exit_handlers!
  deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."
end
executor(executor_identifier) click to toggle source

General access point to global executors. @param [Symbol, Executor] executor_identifier symbols:

- :fast - {Concurrent.global_fast_executor}
- :io - {Concurrent.global_io_executor}
- :immediate - {Concurrent.global_immediate_executor}

@return [Executor]

# File lib/concurrent-ruby/concurrent/configuration.rb, line 83
def self.executor(executor_identifier)
  Options.executor(executor_identifier)
end
global_fast_executor() click to toggle source

Global thread pool optimized for short, fast operations.

@return [ThreadPoolExecutor] the thread pool

# File lib/concurrent-ruby/concurrent/configuration.rb, line 55
def self.global_fast_executor
  GLOBAL_FAST_EXECUTOR.value!
end
global_immediate_executor() click to toggle source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 66
def self.global_immediate_executor
  GLOBAL_IMMEDIATE_EXECUTOR
end
global_io_executor() click to toggle source

Global thread pool optimized for long, blocking (IO) tasks.

@return [ThreadPoolExecutor] the thread pool

# File lib/concurrent-ruby/concurrent/configuration.rb, line 62
def self.global_io_executor
  GLOBAL_IO_EXECUTOR.value!
end
global_logger() click to toggle source
# File lib/concurrent-ruby/concurrent/concern/logging.rb, line 109
def self.global_logger
  GLOBAL_LOGGER.value
end
global_logger=(value) click to toggle source
# File lib/concurrent-ruby/concurrent/concern/logging.rb, line 113
def self.global_logger=(value)
  GLOBAL_LOGGER.value = value
end
global_timer_set() click to toggle source

Global thread pool user for global timers.

@return [Concurrent::TimerSet] the thread pool

# File lib/concurrent-ruby/concurrent/configuration.rb, line 73
def self.global_timer_set
  GLOBAL_TIMER_SET.value!
end
leave_transaction() click to toggle source

Leave a transaction without committing or aborting - see ‘Concurrent::atomically`.

# File lib/concurrent-ruby/concurrent/tvar.rb, line 144
def leave_transaction
  raise Transaction::LeaveError.new
end
monotonic_time(unit = :float_second) click to toggle source

@!macro monotonic_get_time

Returns the current time as tracked by the application monotonic clock.

@param [Symbol] unit the time unit to be returned, can be either
  :float_second, :float_millisecond, :float_microsecond, :second,
  :millisecond, :microsecond, or :nanosecond default to :float_second.

@return [Float] The current monotonic time since some unspecified
  starting point

@!macro monotonic_clock_warning
# File lib/concurrent-ruby/concurrent/utility/monotonic_time.rb, line 15
def monotonic_time(unit = :float_second)
  Process.clock_gettime(Process::CLOCK_MONOTONIC, unit)
end
mutex_owned_per_thread?() click to toggle source

@!visibility private

# File lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb, line 7
def self.mutex_owned_per_thread?
  return false if Concurrent.on_jruby? || Concurrent.on_truffleruby?

  mutex = Mutex.new
  # Lock the mutex:
  mutex.synchronize do
    # Check if the mutex is still owned in a child fiber:
    Fiber.new { mutex.owned? }.resume
  end
end
new_fast_executor(opts = {}) click to toggle source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 87
def self.new_fast_executor(opts = {})
  FixedThreadPool.new(
      [2, Concurrent.processor_count].max,
      auto_terminate:  opts.fetch(:auto_terminate, true),
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "fast"
  )
end
new_io_executor(opts = {}) click to toggle source
# File lib/concurrent-ruby/concurrent/configuration.rb, line 98
def self.new_io_executor(opts = {})
  CachedThreadPool.new(
      auto_terminate:  opts.fetch(:auto_terminate, true),
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "io"
  )
end
physical_processor_count() click to toggle source

Number of physical processor cores on the current system. For performance reasons the calculated value will be memoized on the first call.

On Windows the Win32 API will be queried for the ‘NumberOfCores from Win32_Processor`. This will return the total number “of cores for the current instance of the processor.” On Unix-like operating systems either the `hwprefs` or `sysctl` utility will be called in a subshell and the returned value will be used. In the rare case where none of these methods work or an exception is raised the function will simply return 1.

@return [Integer] number physical processor cores on the current system

@see github.com/grosser/parallel/blob/4fc8b89d08c7091fe0419ca8fba1ec3ce5a8d185/lib/parallel.rb

@see msdn.microsoft.com/en-us/library/aa394373(v=vs.85).aspx @see www.unix.com/man-page/osx/1/HWPREFS/ @see linux.die.net/man/8/sysctl

# File lib/concurrent-ruby/concurrent/utility/processor_counter.rb, line 107
def self.physical_processor_count
  processor_counter.physical_processor_count
end
processor_count() click to toggle source

Number of processors seen by the OS and used for process scheduling. For performance reasons the calculated value will be memoized on the first call.

When running under JRuby the Java runtime call ‘java.lang.Runtime.getRuntime.availableProcessors` will be used. According to the Java documentation this “value may change during a particular invocation of the virtual machine… [applications] should therefore occasionally poll this property.” Subsequently the result will NOT be memoized under JRuby.

Otherwise Ruby’s Etc.nprocessors will be used.

@return [Integer] number of processors seen by the OS or Java runtime

@see docs.oracle.com/javase/6/docs/api/java/lang/Runtime.html#availableProcessors()

# File lib/concurrent-ruby/concurrent/utility/processor_counter.rb, line 86
def self.processor_count
  processor_counter.processor_count
end
use_simple_logger(level = Logger::FATAL, output = $stderr) click to toggle source

Use logger created by create_simple_logger to log concurrent-ruby messages.

# File lib/concurrent-ruby/concurrent/concern/logging.rb, line 63
def self.use_simple_logger(level = Logger::FATAL, output = $stderr)
  Concurrent.global_logger = create_simple_logger level, output
end
use_stdlib_logger(level = Logger::FATAL, output = $stderr) click to toggle source

Use logger created by create_stdlib_logger to log concurrent-ruby messages. @deprecated

# File lib/concurrent-ruby/concurrent/concern/logging.rb, line 96
def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr)
  Concurrent.global_logger = create_stdlib_logger level, output
end

Private Instance Methods

abort_transaction() click to toggle source

Abort a currently running transaction - see ‘Concurrent::atomically`.

# File lib/concurrent-ruby/concurrent/tvar.rb, line 139
def abort_transaction
  raise Transaction::AbortError.new
end
atomically() { || ... } click to toggle source

Run a block that reads and writes ‘TVar`s as a single atomic transaction. With respect to the value of `TVar` objects, the transaction is atomic, in that it either happens or it does not, consistent, in that the `TVar` objects involved will never enter an illegal state, and isolated, in that transactions never interfere with each other. You may recognise these properties from database transactions.

There are some very important and unusual semantics that you must be aware of:

  • Most importantly, the block that you pass to atomically may be executed

    more than once. In most cases your code should be free of
    side-effects, except for via TVar.
  • If an exception escapes an atomically block it will abort the transaction.

  • It is undefined behaviour to use callcc or Fiber with atomically.

  • If you create a new thread within an atomically, it will not be part of

    the transaction. Creating a thread counts as a side-effect.

Transactions within transactions are flattened to a single transaction.

@example

a = new TVar(100_000)
b = new TVar(100)

Concurrent::atomically do
  a.value -= 10
  b.value += 10
end
# File lib/concurrent-ruby/concurrent/tvar.rb, line 82
def atomically
  raise ArgumentError.new('no block given') unless block_given?

  # Get the current transaction

  transaction = Transaction::current

  # Are we not already in a transaction (not nested)?

  if transaction.nil?
    # New transaction

    begin
      # Retry loop

      loop do

        # Create a new transaction

        transaction = Transaction.new
        Transaction::current = transaction

        # Run the block, aborting on exceptions

        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue Transaction::LeaveError => e
          transaction.abort
          break result
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop

        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction

      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block

    yield
  end
end
call_dataflow(method, executor, *inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 56
def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  unless inputs.all? { |input| input.is_a? IVar }
    raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }")
  end

  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end

  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }

    inputs.each do |input|
      input.add_observer counter
    end
  end

  result
end
dataflow(*inputs, &block) click to toggle source

Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. {include:file:docs-source/dataflow.md}

@param [Future] inputs zero or more ‘Future` operations that this dataflow depends upon

@yield The operation to perform once all the dependencies are met @yieldparam [Future] inputs each of the ‘Future` inputs to the dataflow @yieldreturn [Object] the result of the block operation

@return [Object] the result of all the operations

@raise [ArgumentError] if no block is given @raise [ArgumentError] if any of the inputs are not ‘IVar`s

# File lib/concurrent-ruby/concurrent/dataflow.rb, line 34
def dataflow(*inputs, &block)
  dataflow_with(Concurrent.global_io_executor, *inputs, &block)
end
dataflow!(*inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 44
def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.global_io_executor, *inputs, &block)
end
dataflow_with(executor, *inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 39
def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end
dataflow_with!(executor, *inputs, &block) click to toggle source
# File lib/concurrent-ruby/concurrent/dataflow.rb, line 49
def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end
leave_transaction() click to toggle source

Leave a transaction without committing or aborting - see ‘Concurrent::atomically`.

# File lib/concurrent-ruby/concurrent/tvar.rb, line 144
def leave_transaction
  raise Transaction::LeaveError.new
end
monotonic_time(unit = :float_second) click to toggle source

@!macro monotonic_get_time

Returns the current time as tracked by the application monotonic clock.

@param [Symbol] unit the time unit to be returned, can be either
  :float_second, :float_millisecond, :float_microsecond, :second,
  :millisecond, :microsecond, or :nanosecond default to :float_second.

@return [Float] The current monotonic time since some unspecified
  starting point

@!macro monotonic_clock_warning
# File lib/concurrent-ruby/concurrent/utility/monotonic_time.rb, line 15
def monotonic_time(unit = :float_second)
  Process.clock_gettime(Process::CLOCK_MONOTONIC, unit)
end