class CircularQueue
A thread-safe queue with a size limitation. When more elements than the capacity are added, the queue either loops back on itself (removing the oldest elements first) or raises an error (if `enq!` is used).
Useful for streaming data where keeping up with real-time is more important than consuming every message if load rises and the queue backs up.
Exposes the same interface as the `Queue` from the Ruby stdlib.
Example:
# Capacity of 3 q = CircularQueue.new(3) q << 1 # => [1] q << 2 # => [1, 2] q << 3 # => [1, 2, 3] # Elements are replaced when the queue reaches capacity q << 4 # => [2, 3, 4] q << 5 # => [3, 4, 5]
Attributes
Returns the maximum number of elements that can be enqueued @return [Integer]
Returns the number of elements in the queue @return [Integer]
Returns the number of elements in the queue @return [Integer]
Public Class Methods
Creates a new queue of the specified capacity @param [Integer] capacity the maximum capacity of the queue
# File lib/circular_queue.rb, line 34 def initialize(capacity) @capacity = capacity @data = Array.new(capacity) @mutex = Mutex.new @waiting = [] clear end
Public Instance Methods
Returns the last/most recent item in the queue @return [Object] Peek at last item without removing
# File lib/circular_queue.rb, line 128 def back @mutex.synchronize do @data[(@back - 1) % @capacity] end end
Removes all items from the queue @return [CircularQueue] the queue itself
# File lib/circular_queue.rb, line 95 def clear @mutex.synchronize do @size = 0 @front = 0 @back = 0 self end end
Returns the data in the queue @return [Array] the queue Allows for easy iteration of queue from front to back
# File lib/circular_queue.rb, line 143 def data @mutex.synchronize do @data.clone.tap do |data| data.rotate!(@front) data.slice!(@size..-1) end end end
Removes an item from the queue @param [Boolean] non_block true to raise an error if the queue is empty;
otherwise, waits for an item to arrive from another thread
@raise [ThreadError] non_block was true and the queue was empty
# File lib/circular_queue.rb, line 76 def deq(non_block = false) @mutex.synchronize do loop do if empty? raise ThreadError.new("Queue is empty") if non_block @waiting.push(Thread.current) unless @waiting.include?(Thread.current) @mutex.sleep else return deq_item end end end end
Returns whether the queue is empty @return [Boolean] queue is empty
# File lib/circular_queue.rb, line 106 def empty? @size == 0 end
Adds an item to the queue @param [Object] item item to add @return [CircularQueue] the queue itself
# File lib/circular_queue.rb, line 47 def enq(item) @mutex.synchronize do enq_item(item) wakeup_next_waiter self end end
Adds an item to the queue, raising an error if the queue is full @param [Object] item item to add @raise [ThreadError] queue is full @return [CircularQueue] the queue itself
# File lib/circular_queue.rb, line 61 def enq!(item) @mutex.synchronize do raise ThreadError.new("Queue is full") if full? enq_item(item) wakeup_next_waiter self end end
Returns thee first/oldest item in the queue @return [Object] Peek at first item without removing
# File lib/circular_queue.rb, line 119 def front @mutex.synchronize do @data[@front] end end
Returns whether the queue is full @return [Boolean] queue is full
# File lib/circular_queue.rb, line 112 def full? @size == @capacity end
Returns the number of threads waiting for items to arrive in the queue @return [Integer] number of threads waiting
# File lib/circular_queue.rb, line 136 def num_waiting @waiting.length end
Private Instance Methods
# File lib/circular_queue.rb, line 168 def deq_item item = @data[@front] @size -= 1 @front += 1 @front %= @capacity item end
# File lib/circular_queue.rb, line 154 def enq_item(item) @data[@back] = item if full? @front += 1 @front %= @capacity else @size += 1 end @back += 1 @back %= @capacity end
# File lib/circular_queue.rb, line 179 def wakeup_next_waiter if thread = @waiting.shift thread.wakeup end rescue ThreadError retry end