class QuackConcurrency::Queue
This is a duck type for ::Thread::Queue
. It is intended to be a drop in replacement for it's core counterpart. Valuable if ::Thread::Queue
has not been implemented.
Public Class Methods
Creates a new {Queue} concurrency tool. @return [Queue]
# File lib/quack_concurrency/queue.rb, line 10 def initialize @closed = false @items = [] @mutex = ::Mutex.new @pop_mutex = Mutex.new @waiter = Waiter.new end
Public Instance Methods
Removes all objects from it. @return [self]
# File lib/quack_concurrency/queue.rb, line 20 def clear @mutex.synchronize { @items.clear } self end
Closes it. Once closed, it cannot be re-opened. After the call to close completes, the following are true:
-
{#closed?} will return
true
. -
{#close} will be ignored.
-
{#push} will raise an exception.
-
until empty, calling {#pop} will return an object from it as usual.
@return [self]
# File lib/quack_concurrency/queue.rb, line 33 def close @mutex.synchronize do return if closed? @closed = true @waiter.resume_all end self end
Checks if it is closed. @return [Boolean]
# File lib/quack_concurrency/queue.rb, line 44 def closed? @closed end
Checks if it is empty. @return [Boolean]
# File lib/quack_concurrency/queue.rb, line 50 def empty? @items.empty? end
Returns the length of it. @return [Integer]
# File lib/quack_concurrency/queue.rb, line 56 def length @items.length end
Returns the number of threads waiting on it. @return [Integer]
# File lib/quack_concurrency/queue.rb, line 63 def num_waiting @pop_mutex.waiting_threads_count + @waiter.waiting_threads_count end
Retrieves an item from it. @note If it is empty, the method will block until an item is available. If non_block
is true
, a ThreadError
will be raised. @raise [ThreadError] if it is empty and non_block
is true
@param non_block [Boolean] @return [Object]
# File lib/quack_concurrency/queue.rb, line 73 def pop(non_block = false) @pop_mutex.lock do @mutex.synchronize do if empty? return if closed? raise ThreadError if non_block @mutex.unlock @waiter.wait @mutex.lock return if closed? end @items.shift end end end
Pushes the given object to it. @param item [Object] @return [self]
# File lib/quack_concurrency/queue.rb, line 94 def push(item = nil) @mutex.synchronize do raise ClosedQueueError if closed? @items.push(item) @waiter.resume_next end self end