class QuackConcurrency::ConditionVariable

{ConditionVariable} is similar to ::ConditionVariable.

A a few differences include:

Public Class Methods

new() click to toggle source

Creates a new {ConditionVariable} concurrency tool. @return [ConditionVariable]

# File lib/quack_concurrency/condition_variable.rb, line 12
def initialize
  @mutex = ::Mutex.new
  @waitables = []
  @waitables_to_resume = []
end

Public Instance Methods

any_waiting_threads?() click to toggle source

Checks if any threads are waiting on it. @return [Boolean]

# File lib/quack_concurrency/condition_variable.rb, line 20
def any_waiting_threads?
  waiting_threads_count >= 1
end
broadcast() click to toggle source

Resumes all threads waiting on it. @return [self]

# File lib/quack_concurrency/condition_variable.rb, line 26
def broadcast
  @mutex.synchronize do
    signal_next until @waitables_to_resume.empty?
  end
  self
end
next_waitable_to_wake() click to toggle source

Returns the {Waitable} representing the next thread to be woken. It will return the thread that made the earliest call to {#wait}. @api private @return [Waitable]

# File lib/quack_concurrency/condition_variable.rb, line 37
def next_waitable_to_wake
  @mutex.synchronize { @waitables.first }
end
signal() click to toggle source

Resumes next thread waiting on it if one exists. @return [self]

# File lib/quack_concurrency/condition_variable.rb, line 43
def signal
  @mutex.synchronize do
    signal_next if @waitables_to_resume.any?
  end
  self
end
wait(mutex, timeout = nil) click to toggle source

Puts this thread to sleep until another thread resumes it. Threads will be woken in the chronological order that this was called. @note Will block until resumed @param mutex [Mutex] mutex to be unlocked while this thread is sleeping @param timeout [nil,Numeric] maximum time to sleep in seconds, nil for forever @raise [TypeError] if timeout is not nil or Numeric @raise [ArgumentError] if timeout is negative @raise [Exception] any exception raised by +::ConditionVariable#wait+ (eg. interrupts, ThreadError) @return [self]

# File lib/quack_concurrency/condition_variable.rb, line 59
def wait(mutex, timeout = nil)
  validate_mutex(mutex)
  validate_timeout(timeout)
  waitable = waitable_for_current_thread
  @mutex.synchronize do
    @waitables.push(waitable)
    @waitables_to_resume.push(waitable)
  end
  waitable.wait(mutex, timeout)
  self
end
waitable_woken(waitable) click to toggle source

Remove a {Waitable} whose thread has been woken. @api private @return [void]

# File lib/quack_concurrency/condition_variable.rb, line 74
def waitable_woken(waitable)
  @mutex.synchronize { @waitables.delete(waitable) }
end
waiting_threads_count() click to toggle source

Returns the number of threads currently waiting on it. @return [Integer]

# File lib/quack_concurrency/condition_variable.rb, line 80
def waiting_threads_count
  @waitables.length
end

Private Instance Methods

signal_next() click to toggle source

Wakes up the next waiting thread. Will try again if the thread has already been woken. @api private @return [void]

# File lib/quack_concurrency/condition_variable.rb, line 90
def signal_next
  loop do
    next_waitable = @waitables_to_resume.shift
    if next_waitable
      resume_successful = next_waitable.resume
      break if resume_successful
    end
  end
  nil
end
validate_mutex(mutex) click to toggle source

Validates that an object behaves like a ::Mutex Must be able to lock and unlock mutex. @api private @param mutex [Mutex] mutex to be validated @raise [TypeError] if mutex does not behave like a ::Mutex @return [void]

# File lib/quack_concurrency/condition_variable.rb, line 107
def validate_mutex(mutex)
  return if mutex.respond_to?(:lock) && mutex.respond_to?(:unlock)
  return if mutex.respond_to?(:unlock!)
  raise TypeError, "'mutex' must respond to ('lock' and 'unlock') or 'unlock!'"
end
validate_timeout(timeout) click to toggle source

Validates a timeout value @api private @param timeout [nil,Numeric] @raise [TypeError] if timeout is not nil or Numeric @raise [ArgumentError] if timeout is negative @return [void]

# File lib/quack_concurrency/condition_variable.rb, line 119
def validate_timeout(timeout)
  unless timeout == nil
    raise TypeError, "'timeout' must be nil or a Numeric" unless timeout.is_a?(Numeric)
    raise ArgumentError, "'timeout' must not be negative" if timeout.negative?
  end
end
waitable_for_current_thread() click to toggle source

Returns a waitable to represent the current thread. @api private @return [Waitable]

# File lib/quack_concurrency/condition_variable.rb, line 129
def waitable_for_current_thread
  Waitable.new(self)
end